From commits-return-22583-archive-asf-public=cust-asf.ponee.io@accumulo.apache.org Thu Feb 7 22:34:16 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 30FB518067C for ; Thu, 7 Feb 2019 23:34:15 +0100 (CET) Received: (qmail 36348 invoked by uid 500); 7 Feb 2019 22:34:14 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 36339 invoked by uid 99); 7 Feb 2019 22:34:14 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 07 Feb 2019 22:34:14 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 6D43B80A78; Thu, 7 Feb 2019 22:34:13 +0000 (UTC) Date: Thu, 07 Feb 2019 22:34:14 +0000 To: "commits@accumulo.apache.org" Subject: [accumulo] 01/01: Merge remote-tracking branch 'upstream/1.9' MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit From: mwalch@apache.org In-Reply-To: <154957885328.26599.14085268396147430362@gitbox.apache.org> References: <154957885328.26599.14085268396147430362@gitbox.apache.org> X-Git-Host: gitbox.apache.org X-Git-Repo: accumulo X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Rev: 218de58b52a3eb43201d46e34f639c6a6a7dd58a X-Git-NotificationType: diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated Message-Id: <20190207223413.6D43B80A78@gitbox.apache.org> This is an automated email from the ASF dual-hosted git repository. mwalch pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/accumulo.git commit 218de58b52a3eb43201d46e34f639c6a6a7dd58a Merge: 27ca87b 61c4491 Author: Mike Walch AuthorDate: Thu Feb 7 17:33:00 2019 -0500 Merge remote-tracking branch 'upstream/1.9' .../miniclusterImpl/MiniAccumuloClusterImpl.java | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --cc minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java index 5037f35,0000000..e7bda12 mode 100644,000000..100644 --- a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java +++ b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java @@@ -1,859 -1,0 +1,871 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.miniclusterImpl; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileWriter; +import java.io.IOException; +import java.io.InputStream; +import java.io.UncheckedIOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; +import java.net.URLClassLoader; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.FutureTask; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.accumulo.cluster.AccumuloCluster; +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; +import org.apache.accumulo.core.clientImpl.ClientContext; +import org.apache.accumulo.core.clientImpl.MasterClient; +import org.apache.accumulo.core.clientImpl.thrift.ThriftNotActiveServiceException; +import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.ClientProperty; +import org.apache.accumulo.core.conf.ConfigurationCopy; +import org.apache.accumulo.core.conf.DefaultConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.conf.SiteConfiguration; +import org.apache.accumulo.core.master.thrift.MasterClientService; +import org.apache.accumulo.core.master.thrift.MasterGoalState; +import org.apache.accumulo.core.master.thrift.MasterMonitorInfo; +import org.apache.accumulo.core.trace.Tracer; +import org.apache.accumulo.core.util.Pair; +import org.apache.accumulo.fate.zookeeper.IZooReaderWriter; +import org.apache.accumulo.fate.zookeeper.ZooUtil; +import org.apache.accumulo.master.state.SetGoalState; +import org.apache.accumulo.minicluster.MiniAccumuloCluster; +import org.apache.accumulo.minicluster.ServerType; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.ServerUtil; +import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.accumulo.server.fs.VolumeManagerImpl; +import org.apache.accumulo.server.init.Initialize; +import org.apache.accumulo.server.util.AccumuloStatus; +import org.apache.accumulo.server.util.PortUtils; +import org.apache.accumulo.server.zookeeper.ZooReaderWriterFactory; +import org.apache.accumulo.start.Main; +import org.apache.accumulo.start.classloader.vfs.MiniDFSUtil; +import org.apache.commons.io.IOUtils; +import org.apache.commons.vfs2.FileObject; +import org.apache.commons.vfs2.impl.VFSClassLoader; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.thrift.TException; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Joiner; +import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; + +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; + +/** + * This class provides the backing implementation for {@link MiniAccumuloCluster}, and may contain + * features for internal testing which have not yet been promoted to the public API. It's best to + * use {@link MiniAccumuloCluster} whenever possible. Use of this class risks API breakage between + * versions. + * + * @since 1.6.0 + */ +public class MiniAccumuloClusterImpl implements AccumuloCluster { + private static final Logger log = LoggerFactory.getLogger(MiniAccumuloClusterImpl.class); + + private boolean initialized = false; + + private Set> debugPorts = new HashSet<>(); + + private File zooCfgFile; + private String dfsUri; + private SiteConfiguration siteConfig; + private ServerContext context; + private Properties clientProperties; + + private MiniAccumuloConfigImpl config; + private MiniDFSCluster miniDFS = null; + private List cleanup = new ArrayList<>(); + + private ExecutorService executor; + + private MiniAccumuloClusterControl clusterControl; + + File getZooCfgFile() { + return zooCfgFile; + } + + public ProcessInfo exec(Class clazz, String... args) throws IOException { + return exec(clazz, null, args); + } + + public ProcessInfo exec(Class clazz, List jvmArgs, String... args) throws IOException { + ArrayList jvmArgs2 = new ArrayList<>(1 + (jvmArgs == null ? 0 : jvmArgs.size())); + jvmArgs2.add("-Xmx" + config.getDefaultMemory()); + if (jvmArgs != null) + jvmArgs2.addAll(jvmArgs); + return _exec(clazz, jvmArgs2, args); + } + + private boolean containsConfigFile(File f) { + if (!f.isDirectory()) { + return false; + } else { + File[] files = f.listFiles(pathname -> pathname.getName().endsWith("site.xml") + || pathname.getName().equals("accumulo.properties")); + return files != null && files.length > 0; + } + } + + @SuppressFBWarnings(value = "PATH_TRAVERSAL_IN", + justification = "mini runs in the same security context as user providing the url") + private void append(StringBuilder classpathBuilder, URL url) throws URISyntaxException { + File file = new File(url.toURI()); + // do not include dirs containing hadoop or accumulo config files + if (!containsConfigFile(file)) + classpathBuilder.append(File.pathSeparator).append(file.getAbsolutePath()); + } + + private String getClasspath() throws IOException { + + try { + ArrayList classloaders = new ArrayList<>(); + + ClassLoader cl = this.getClass().getClassLoader(); + + while (cl != null) { + classloaders.add(cl); + cl = cl.getParent(); + } + + Collections.reverse(classloaders); + + StringBuilder classpathBuilder = new StringBuilder(); + classpathBuilder.append(config.getConfDir().getAbsolutePath()); + + if (config.getHadoopConfDir() != null) + classpathBuilder.append(File.pathSeparator) + .append(config.getHadoopConfDir().getAbsolutePath()); + + if (config.getClasspathItems() == null) { + + // assume 0 is the system classloader and skip it + for (int i = 1; i < classloaders.size(); i++) { + ClassLoader classLoader = classloaders.get(i); + + if (classLoader instanceof URLClassLoader) { + + for (URL u : ((URLClassLoader) classLoader).getURLs()) { + append(classpathBuilder, u); + } + + } else if (classLoader instanceof VFSClassLoader) { + + VFSClassLoader vcl = (VFSClassLoader) classLoader; + for (FileObject f : vcl.getFileObjects()) { + append(classpathBuilder, f.getURL()); + } + } else { - throw new IllegalArgumentException( - "Unknown classloader type : " + classLoader.getClass().getName()); ++ if (classLoader.getClass().getName() ++ .equals("jdk.internal.loader.ClassLoaders$AppClassLoader")) { ++ log.debug("Detected Java 11 classloader: {}", classLoader.getClass().getName()); ++ } else { ++ log.debug("Detected unknown classloader: {}", classLoader.getClass().getName()); ++ } ++ String javaClassPath = System.getProperty("java.class.path"); ++ if (javaClassPath == null) { ++ throw new IllegalStateException("java.class.path is not set"); ++ } else { ++ log.debug("Using classpath set by java.class.path system property: {}", ++ javaClassPath); ++ } ++ classpathBuilder.append(File.pathSeparator).append(javaClassPath); + } + } + } else { + for (String s : config.getClasspathItems()) + classpathBuilder.append(File.pathSeparator).append(s); + } + + return classpathBuilder.toString(); + + } catch (URISyntaxException e) { + throw new IOException(e); + } + } + + public static class ProcessInfo { + + private final Process process; + private final File stdOut; + + public ProcessInfo(Process process, File stdOut) { + this.process = process; + this.stdOut = stdOut; + } + + public Process getProcess() { + return process; + } + + public String readStdOut() { + try (InputStream in = new FileInputStream(stdOut)) { + return IOUtils.toString(in, StandardCharsets.UTF_8); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + } + + @SuppressFBWarnings(value = "COMMAND_INJECTION", + justification = "mini runs in the same security context as user providing the args") + private ProcessInfo _exec(Class clazz, List extraJvmOpts, String... args) + throws IOException { + String javaHome = System.getProperty("java.home"); + String javaBin = javaHome + File.separator + "bin" + File.separator + "java"; + String classpath = getClasspath(); + + String className = clazz.getName(); + + ArrayList argList = new ArrayList<>(); + argList.addAll(Arrays.asList(javaBin, "-Dproc=" + clazz.getSimpleName(), "-cp", classpath)); + argList.addAll(extraJvmOpts); + for (Entry sysProp : config.getSystemProperties().entrySet()) { + argList.add(String.format("-D%s=%s", sysProp.getKey(), sysProp.getValue())); + } + // @formatter:off + argList.addAll(Arrays.asList( + "-XX:+UseConcMarkSweepGC", + "-XX:CMSInitiatingOccupancyFraction=75", + "-Dapple.awt.UIElement=true", + "-Djava.net.preferIPv4Stack=true", + "-XX:+PerfDisableSharedMem", + "-XX:+AlwaysPreTouch", + Main.class.getName(), className)); + // @formatter:on + argList.addAll(Arrays.asList(args)); + + ProcessBuilder builder = new ProcessBuilder(argList); + + builder.environment().put("ACCUMULO_HOME", config.getDir().getAbsolutePath()); + builder.environment().put("ACCUMULO_LOG_DIR", config.getLogDir().getAbsolutePath()); + builder.environment().put("ACCUMULO_CLIENT_CONF_PATH", + config.getClientConfFile().getAbsolutePath()); + String ldLibraryPath = Joiner.on(File.pathSeparator).join(config.getNativeLibPaths()); + builder.environment().put("LD_LIBRARY_PATH", ldLibraryPath); + builder.environment().put("DYLD_LIBRARY_PATH", ldLibraryPath); + + // if we're running under accumulo.start, we forward these env vars + String env = System.getenv("HADOOP_HOME"); + if (env != null) + builder.environment().put("HADOOP_HOME", env); + env = System.getenv("ZOOKEEPER_HOME"); + if (env != null) + builder.environment().put("ZOOKEEPER_HOME", env); + builder.environment().put("ACCUMULO_CONF_DIR", config.getConfDir().getAbsolutePath()); + if (config.getHadoopConfDir() != null) + builder.environment().put("HADOOP_CONF_DIR", config.getHadoopConfDir().getAbsolutePath()); + + log.debug("Starting MiniAccumuloCluster process with class: " + clazz.getSimpleName() + + "\n, jvmOpts: " + extraJvmOpts + "\n, classpath: " + classpath + "\n, args: " + argList + + "\n, environment: " + builder.environment()); + + Integer hashcode = builder.hashCode(); + + File stdOut = new File(config.getLogDir(), clazz.getSimpleName() + "_" + hashcode + ".out"); + File stdErr = new File(config.getLogDir(), clazz.getSimpleName() + "_" + hashcode + ".err"); + + Process process = builder.redirectError(stdErr).redirectOutput(stdOut).start(); + + cleanup.add(process); + + return new ProcessInfo(process, stdOut); + } + + public ProcessInfo _exec(Class clazz, ServerType serverType, + Map configOverrides, String... args) throws IOException { + List jvmOpts = new ArrayList<>(); + jvmOpts.add("-Xmx" + config.getMemory(serverType)); + if (configOverrides != null && !configOverrides.isEmpty()) { + File siteFile = Files.createTempFile(config.getConfDir().toPath(), "accumulo", ".properties") + .toFile(); + Map confMap = new HashMap<>(); + confMap.putAll(config.getSiteConfig()); + confMap.putAll(configOverrides); + writeConfigProperties(siteFile, confMap); + jvmOpts.add("-Daccumulo.properties=" + siteFile.getName()); + } + + if (config.isJDWPEnabled()) { + Integer port = PortUtils.getRandomFreePort(); + jvmOpts.addAll(buildRemoteDebugParams(port)); + debugPorts.add(new Pair<>(serverType, port)); + } + return _exec(clazz, jvmOpts, args); + } + + /** + * + * @param dir + * An empty or nonexistent temp directory that Accumulo and Zookeeper can store data in. + * Creating the directory is left to the user. Java 7, Guava, and Junit provide methods + * for creating temporary directories. + * @param rootPassword + * Initial root password for instance. + */ + public MiniAccumuloClusterImpl(File dir, String rootPassword) throws IOException { + this(new MiniAccumuloConfigImpl(dir, rootPassword)); + } + + /** + * @param config + * initial configuration + */ + @SuppressWarnings("deprecation") + public MiniAccumuloClusterImpl(MiniAccumuloConfigImpl config) throws IOException { + + this.config = config.initialize(); + + mkdirs(config.getConfDir()); + mkdirs(config.getLogDir()); + mkdirs(config.getLibDir()); + mkdirs(config.getLibExtDir()); + + if (!config.useExistingInstance()) { + if (!config.useExistingZooKeepers()) + mkdirs(config.getZooKeeperDir()); + mkdirs(config.getAccumuloDir()); + } + + if (config.useMiniDFS()) { + File nn = new File(config.getAccumuloDir(), "nn"); + mkdirs(nn); + File dn = new File(config.getAccumuloDir(), "dn"); + mkdirs(dn); + File dfs = new File(config.getAccumuloDir(), "dfs"); + mkdirs(dfs); + Configuration conf = new Configuration(); + conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, nn.getAbsolutePath()); + conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, dn.getAbsolutePath()); + conf.set(DFSConfigKeys.DFS_REPLICATION_KEY, "1"); + conf.set(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY, "1"); + conf.set("dfs.support.append", "true"); + conf.set("dfs.datanode.synconclose", "true"); + conf.set("dfs.datanode.data.dir.perm", MiniDFSUtil.computeDatanodeDirectoryPermission()); + String oldTestBuildData = System.setProperty("test.build.data", dfs.getAbsolutePath()); + miniDFS = new MiniDFSCluster.Builder(conf).build(); + if (oldTestBuildData == null) + System.clearProperty("test.build.data"); + else + System.setProperty("test.build.data", oldTestBuildData); + miniDFS.waitClusterUp(); + InetSocketAddress dfsAddress = miniDFS.getNameNode().getNameNodeAddress(); + dfsUri = "hdfs://" + dfsAddress.getHostName() + ":" + dfsAddress.getPort(); + File coreFile = new File(config.getConfDir(), "core-site.xml"); + writeConfig(coreFile, Collections.singletonMap("fs.default.name", dfsUri).entrySet()); + File hdfsFile = new File(config.getConfDir(), "hdfs-site.xml"); + writeConfig(hdfsFile, conf); + + Map siteConfig = config.getSiteConfig(); + siteConfig.put(Property.INSTANCE_DFS_URI.getKey(), dfsUri); + siteConfig.put(Property.INSTANCE_DFS_DIR.getKey(), "/accumulo"); + config.setSiteConfig(siteConfig); + } else if (config.useExistingInstance()) { + dfsUri = config.getHadoopConfiguration().get(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY); + } else { + dfsUri = "file:///"; + } + + File clientConfFile = config.getClientConfFile(); + // Write only the properties that correspond to ClientConfiguration properties + writeConfigProperties(clientConfFile, + Maps.filterEntries(config.getSiteConfig(), + v -> org.apache.accumulo.core.client.ClientConfiguration.ClientProperty + .getPropertyByKey(v.getKey()) != null)); + + Map clientProps = config.getClientProps(); + clientProps.put(ClientProperty.INSTANCE_ZOOKEEPERS.getKey(), config.getZooKeepers()); + clientProps.put(ClientProperty.INSTANCE_NAME.getKey(), config.getInstanceName()); + if (!clientProps.containsKey(ClientProperty.AUTH_TYPE.getKey())) { + clientProps.put(ClientProperty.AUTH_TYPE.getKey(), "password"); + clientProps.put(ClientProperty.AUTH_PRINCIPAL.getKey(), config.getRootUserName()); + clientProps.put(ClientProperty.AUTH_TOKEN.getKey(), config.getRootPassword()); + } + + File clientPropsFile = config.getClientPropsFile(); + writeConfigProperties(clientPropsFile, clientProps); + + File siteFile = new File(config.getConfDir(), "accumulo.properties"); + writeConfigProperties(siteFile, config.getSiteConfig()); + siteConfig = new SiteConfiguration(siteFile); + + if (!config.useExistingInstance() && !config.useExistingZooKeepers()) { + zooCfgFile = new File(config.getConfDir(), "zoo.cfg"); + FileWriter fileWriter = new FileWriter(zooCfgFile); + + // zookeeper uses Properties to read its config, so use that to write in order to properly + // escape things like Windows paths + Properties zooCfg = new Properties(); + zooCfg.setProperty("tickTime", "2000"); + zooCfg.setProperty("initLimit", "10"); + zooCfg.setProperty("syncLimit", "5"); + zooCfg.setProperty("clientPortAddress", "127.0.0.1"); + zooCfg.setProperty("clientPort", config.getZooKeeperPort() + ""); + zooCfg.setProperty("maxClientCnxns", "1000"); + zooCfg.setProperty("dataDir", config.getZooKeeperDir().getAbsolutePath()); + zooCfg.store(fileWriter, null); + + fileWriter.close(); + } + clusterControl = new MiniAccumuloClusterControl(this); + } + + private static void mkdirs(File dir) { + if (!dir.mkdirs()) { + log.warn("Unable to create {}", dir); + } + } + + private void writeConfig(File file, Iterable> settings) + throws IOException { + FileWriter fileWriter = new FileWriter(file); + fileWriter.append("\n"); + + for (Entry entry : settings) { + String value = entry.getValue().replace("&", "&").replace("<", "<").replace(">", + ">"); + fileWriter.append( + "" + entry.getKey() + "" + value + "\n"); + } + fileWriter.append("\n"); + fileWriter.close(); + } + + private void writeConfigProperties(File file, Map settings) throws IOException { + FileWriter fileWriter = new FileWriter(file); + + for (Entry entry : settings.entrySet()) + fileWriter.append(entry.getKey() + "=" + entry.getValue() + "\n"); + fileWriter.close(); + } + + /** + * Starts Accumulo and Zookeeper processes. Can only be called once. + */ + @SuppressFBWarnings(value = "UNENCRYPTED_SOCKET", + justification = "insecure socket used for reservation") + @Override + public synchronized void start() throws IOException, InterruptedException { + if (config.useMiniDFS() && miniDFS == null) { + throw new IllegalStateException("Cannot restart mini when using miniDFS"); + } + + MiniAccumuloClusterControl control = getClusterControl(); + + if (config.useExistingInstance()) { + AccumuloConfiguration acuConf = config.getAccumuloConfiguration(); + Configuration hadoopConf = config.getHadoopConfiguration(); + + ConfigurationCopy cc = new ConfigurationCopy(acuConf); + VolumeManager fs; + try { + fs = VolumeManagerImpl.get(cc, hadoopConf); + } catch (IOException e) { + throw new RuntimeException(e); + } + Path instanceIdPath = ServerUtil.getAccumuloInstanceIdPath(fs); + + String instanceIdFromFile = ZooUtil.getInstanceIDFromHdfs(instanceIdPath, cc, hadoopConf); + IZooReaderWriter zrw = new ZooReaderWriterFactory().getZooReaderWriter( + cc.get(Property.INSTANCE_ZK_HOST), (int) cc.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT), + cc.get(Property.INSTANCE_SECRET)); + + String rootPath = ZooUtil.getRoot(instanceIdFromFile); + + String instanceName = null; + try { + for (String name : zrw.getChildren(Constants.ZROOT + Constants.ZINSTANCES)) { + String instanceNamePath = Constants.ZROOT + Constants.ZINSTANCES + "/" + name; + byte[] bytes = zrw.getData(instanceNamePath, new Stat()); + String iid = new String(bytes, UTF_8); + if (iid.equals(instanceIdFromFile)) { + instanceName = name; + } + } + } catch (KeeperException e) { + throw new RuntimeException("Unable to read instance name from zookeeper.", e); + } + if (instanceName == null) + throw new RuntimeException("Unable to read instance name from zookeeper."); + + config.setInstanceName(instanceName); + if (!AccumuloStatus.isAccumuloOffline(zrw, rootPath)) + throw new RuntimeException( + "The Accumulo instance being used is already running. Aborting."); + } else { + if (!initialized) { + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + try { + MiniAccumuloClusterImpl.this.stop(); + } catch (IOException e) { + log.error("IOException while attempting to stop the MiniAccumuloCluster.", e); + } catch (InterruptedException e) { + log.error("The stopping of MiniAccumuloCluster was interrupted.", e); + } + })); + } + + if (!config.useExistingZooKeepers()) + control.start(ServerType.ZOOKEEPER); + + if (!initialized) { + if (!config.useExistingZooKeepers()) { + // sleep a little bit to let zookeeper come up before calling init, seems to work better + long startTime = System.currentTimeMillis(); + while (true) { + Socket s = null; + try { + s = new Socket("localhost", config.getZooKeeperPort()); + s.setReuseAddress(true); + s.getOutputStream().write("ruok\n".getBytes()); + s.getOutputStream().flush(); + byte buffer[] = new byte[100]; + int n = s.getInputStream().read(buffer); + if (n >= 4 && new String(buffer, 0, 4).equals("imok")) + break; + } catch (Exception e) { + if (System.currentTimeMillis() - startTime >= config.getZooKeeperStartupTime()) { + throw new ZooKeeperBindException("Zookeeper did not start within " + + (config.getZooKeeperStartupTime() / 1000) + " seconds. Check the logs in " + + config.getLogDir() + " for errors. Last exception: " + e); + } + // Don't spin absurdly fast + sleepUninterruptibly(250, TimeUnit.MILLISECONDS); + } finally { + if (s != null) + s.close(); + } + } + } + + LinkedList args = new LinkedList<>(); + args.add("--instance-name"); + args.add(config.getInstanceName()); + args.add("--user"); + args.add(config.getRootUserName()); + args.add("--clear-instance-name"); + + // If we aren't using SASL, add in the root password + final String saslEnabled = config.getSiteConfig() + .get(Property.INSTANCE_RPC_SASL_ENABLED.getKey()); + if (saslEnabled == null || !Boolean.parseBoolean(saslEnabled)) { + args.add("--password"); + args.add(config.getRootPassword()); + } + + Process initProcess = exec(Initialize.class, args.toArray(new String[0])).getProcess(); + int ret = initProcess.waitFor(); + if (ret != 0) { + throw new RuntimeException("Initialize process returned " + ret + ". Check the logs in " + + config.getLogDir() + " for errors."); + } + initialized = true; + } + } + + log.info("Starting MAC against instance {} and zookeeper(s) {}.", config.getInstanceName(), + config.getZooKeepers()); + + control.start(ServerType.TABLET_SERVER); + + int ret = 0; + for (int i = 0; i < 5; i++) { + ret = exec(Main.class, SetGoalState.class.getName(), MasterGoalState.NORMAL.toString()) + .getProcess().waitFor(); + if (ret == 0) + break; + sleepUninterruptibly(1, TimeUnit.SECONDS); + } + if (ret != 0) { + throw new RuntimeException("Could not set master goal state, process returned " + ret + + ". Check the logs in " + config.getLogDir() + " for errors."); + } + + control.start(ServerType.MASTER); + control.start(ServerType.GARBAGE_COLLECTOR); + + if (executor == null) { + executor = Executors.newSingleThreadExecutor(); + } + } + + private List buildRemoteDebugParams(int port) { + return Arrays.asList("-Xdebug", + String.format("-Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=%d", port)); + } + + /** + * @return generated remote debug ports if in debug mode. + * @since 1.6.0 + */ + public Set> getDebugPorts() { + return debugPorts; + } + + List references(Process... procs) { + List result = new ArrayList<>(); + for (Process proc : procs) { + result.add(new ProcessReference(proc)); + } + return result; + } + + public Map> getProcesses() { + Map> result = new HashMap<>(); + MiniAccumuloClusterControl control = getClusterControl(); + result.put(ServerType.MASTER, references(control.masterProcess)); + result.put(ServerType.TABLET_SERVER, + references(control.tabletServerProcesses.toArray(new Process[0]))); + if (control.zooKeeperProcess != null) { + result.put(ServerType.ZOOKEEPER, references(control.zooKeeperProcess)); + } + if (control.gcProcess != null) { + result.put(ServerType.GARBAGE_COLLECTOR, references(control.gcProcess)); + } + return result; + } + + public void killProcess(ServerType type, ProcessReference proc) + throws ProcessNotFoundException, InterruptedException { + getClusterControl().killProcess(type, proc); + } + + @Override + public String getInstanceName() { + return config.getInstanceName(); + } + + @Override + public String getZooKeepers() { + return config.getZooKeepers(); + } + + @Override + public synchronized ServerContext getServerContext() { + if (context == null) { + context = new ServerContext(siteConfig); + } + return context; + } + + /** + * Stops Accumulo and Zookeeper processes. If stop is not called, there is a shutdown hook that is + * setup to kill the processes. However it's probably best to call stop in a finally block as soon + * as possible. + */ + @Override + public synchronized void stop() throws IOException, InterruptedException { + if (executor == null) { + // keep repeated calls to stop() from failing + return; + } + + MiniAccumuloClusterControl control = getClusterControl(); + + control.stop(ServerType.GARBAGE_COLLECTOR, null); + control.stop(ServerType.MASTER, null); + control.stop(ServerType.TABLET_SERVER, null); + control.stop(ServerType.ZOOKEEPER, null); + + // ACCUMULO-2985 stop the ExecutorService after we finished using it to stop accumulo procs + if (executor != null) { + List tasksRemaining = executor.shutdownNow(); + + // the single thread executor shouldn't have any pending tasks, but check anyways + if (!tasksRemaining.isEmpty()) { + log.warn( + "Unexpectedly had {} task(s) remaining in threadpool for execution when being stopped", + tasksRemaining.size()); + } + + executor = null; + } + + if (config.useMiniDFS() && miniDFS != null) + miniDFS.shutdown(); + for (Process p : cleanup) { + p.destroy(); + p.waitFor(); + } + miniDFS = null; + } + + /** + * @since 1.6.0 + */ + public MiniAccumuloConfigImpl getConfig() { + return config; + } + + @Override + public AccumuloClient createAccumuloClient(String user, AuthenticationToken token) { + return Accumulo.newClient().from(getClientProperties()).as(user, token).build(); + } + + @SuppressWarnings("deprecation") + @Override + public org.apache.accumulo.core.client.ClientConfiguration getClientConfig() { + return org.apache.accumulo.core.client.ClientConfiguration.fromMap(config.getSiteConfig()) + .withInstance(this.getInstanceName()).withZkHosts(this.getZooKeepers()); + } + + @Override + public synchronized Properties getClientProperties() { + if (clientProperties == null) { + clientProperties = Accumulo.newClientProperties().from(config.getClientPropsFile().toPath()) + .build(); + } + return clientProperties; + } + + @Override + public FileSystem getFileSystem() { + try { + return FileSystem.get(new URI(dfsUri), new Configuration()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @VisibleForTesting + protected void setShutdownExecutor(ExecutorService svc) { + this.executor = svc; + } + + @VisibleForTesting + protected ExecutorService getShutdownExecutor() { + return executor; + } + + int stopProcessWithTimeout(final Process proc, long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + FutureTask future = new FutureTask<>(() -> { + proc.destroy(); + return proc.waitFor(); + }); + + executor.execute(future); + + return future.get(timeout, unit); + } + + /** + * Get programmatic interface to information available in a normal monitor. XXX the returned + * structure won't contain information about the metadata table until there is data in it. e.g. if + * you want to see the metadata table you should create a table. + * + * @since 1.6.1 + */ + public MasterMonitorInfo getMasterMonitorInfo() + throws AccumuloException, AccumuloSecurityException { + MasterClientService.Iface client = null; + while (true) { + try (AccumuloClient c = Accumulo.newClient().from(getClientProperties()).build()) { + client = MasterClient.getConnectionWithRetry((ClientContext) c); + return client.getMasterStats(Tracer.traceInfo(), ((ClientContext) c).rpcCreds()); + } catch (ThriftSecurityException exception) { + throw new AccumuloSecurityException(exception); + } catch (ThriftNotActiveServiceException e) { + // Let it loop, fetching a new location + log.debug("Contacted a Master which is no longer active, retrying"); + sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + } catch (TException exception) { + throw new AccumuloException(exception); + } finally { + if (client != null) { + MasterClient.close(client); + } + } + } + } + + public synchronized MiniDFSCluster getMiniDfs() { + return this.miniDFS; + } + + @Override + public MiniAccumuloClusterControl getClusterControl() { + return clusterControl; + } + + @Override + public Path getTemporaryPath() { + if (config.useMiniDFS()) { + return new Path("/tmp/"); + } else { + File tmp = new File(config.getDir(), "tmp"); + mkdirs(tmp); + return new Path(tmp.toString()); + } + } + + @Override + public AccumuloConfiguration getSiteConfiguration() { + return new ConfigurationCopy( + Iterables.concat(DefaultConfiguration.getInstance(), config.getSiteConfig().entrySet())); + } + + @Override + public String getAccumuloPropertiesPath() { + return new File(config.getConfDir(), "accumulo.properties").toString(); + } +}