From commits-return-22330-archive-asf-public=cust-asf.ponee.io@accumulo.apache.org Thu Nov 15 23:46:27 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id E352218067C for ; Thu, 15 Nov 2018 23:46:26 +0100 (CET) Received: (qmail 23061 invoked by uid 500); 15 Nov 2018 22:46:26 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 22906 invoked by uid 99); 15 Nov 2018 22:46:26 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 15 Nov 2018 22:46:26 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 63B21857BE; Thu, 15 Nov 2018 22:46:25 +0000 (UTC) Date: Thu, 15 Nov 2018 22:46:27 +0000 To: "commits@accumulo.apache.org" Subject: [accumulo] 02/03: Merge branch '1.9' MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit From: mwalch@apache.org In-Reply-To: <154232198524.13098.2405376580632551036@gitbox.apache.org> References: <154232198524.13098.2405376580632551036@gitbox.apache.org> X-Git-Host: gitbox.apache.org X-Git-Repo: accumulo X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Rev: 9e40d02f6eceb7b0264fdbcc3b1411adc9eb8da6 X-Git-NotificationType: diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated Message-Id: <20181115224625.63B21857BE@gitbox.apache.org> This is an automated email from the ASF dual-hosted git repository. mwalch pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/accumulo.git commit 9e40d02f6eceb7b0264fdbcc3b1411adc9eb8da6 Merge: 7703394 61a6eb8 Author: Mike Walch AuthorDate: Thu Nov 15 17:28:49 2018 -0500 Merge branch '1.9' .../base/src/main/java/org/apache/accumulo/server/ServerUtil.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --cc server/base/src/main/java/org/apache/accumulo/server/ServerUtil.java index c9719fb,0000000..f7d1a5f mode 100644,000000..100644 --- a/server/base/src/main/java/org/apache/accumulo/server/ServerUtil.java +++ b/server/base/src/main/java/org/apache/accumulo/server/ServerUtil.java @@@ -1,281 -1,0 +1,283 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.UnknownHostException; +import java.util.Arrays; +import java.util.Map.Entry; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.util.AddressUtil; +import org.apache.accumulo.core.volume.Volume; +import org.apache.accumulo.fate.ReadOnlyStore; +import org.apache.accumulo.fate.ReadOnlyTStore; +import org.apache.accumulo.fate.ZooStore; +import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.accumulo.server.util.time.SimpleTimer; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ServerUtil { + + private static final Logger log = LoggerFactory.getLogger(ServerUtil.class); + + public static synchronized void updateAccumuloVersion(VolumeManager fs, int oldVersion) { + for (Volume volume : fs.getVolumes()) { + try { + if (getAccumuloPersistentVersion(volume) == oldVersion) { + log.debug("Attempting to upgrade {}", volume); + Path dataVersionLocation = ServerConstants.getDataVersionLocation(volume); + fs.create(new Path(dataVersionLocation, Integer.toString(ServerConstants.DATA_VERSION))) + .close(); + // TODO document failure mode & recovery if FS permissions cause above to work and below + // to fail ACCUMULO-2596 + Path prevDataVersionLoc = new Path(dataVersionLocation, Integer.toString(oldVersion)); + if (!fs.delete(prevDataVersionLoc)) { + throw new RuntimeException("Could not delete previous data version location (" + + prevDataVersionLoc + ") for " + volume); + } + } + } catch (IOException e) { + throw new RuntimeException("Unable to set accumulo version: an error occurred.", e); + } + } + } + + public static synchronized int getAccumuloPersistentVersion(FileSystem fs, Path path) { + int dataVersion; + try { + FileStatus[] files = fs.listStatus(path); + if (files == null || files.length == 0) { + dataVersion = -1; // assume it is 0.5 or earlier + } else { + dataVersion = Integer.parseInt(files[0].getPath().getName()); + } + return dataVersion; + } catch (IOException e) { + throw new RuntimeException("Unable to read accumulo version: an error occurred.", e); + } + } + + public static synchronized int getAccumuloPersistentVersion(Volume v) { + Path path = ServerConstants.getDataVersionLocation(v); + return getAccumuloPersistentVersion(v.getFileSystem(), path); + } + + public static synchronized int getAccumuloPersistentVersion(VolumeManager fs) { + // It doesn't matter which Volume is used as they should all have the data version stored + return getAccumuloPersistentVersion(fs.getVolumes().iterator().next()); + } + + public static synchronized Path getAccumuloInstanceIdPath(VolumeManager fs) { + // It doesn't matter which Volume is used as they should all have the instance ID stored + Volume v = fs.getVolumes().iterator().next(); + return ServerConstants.getInstanceIdLocation(v); + } + + public static void init(ServerContext context, String application) { + final AccumuloConfiguration conf = context.getConfiguration(); + + log.info("{} starting", application); + log.info("Instance {}", context.getInstanceID()); + int dataVersion = ServerUtil.getAccumuloPersistentVersion(context.getVolumeManager()); + log.info("Data Version {}", dataVersion); + ServerUtil.waitForZookeeperAndHdfs(context); + + if (!(canUpgradeFromDataVersion(dataVersion))) { + throw new RuntimeException("This version of accumulo (" + Constants.VERSION + + ") is not compatible with files stored using data version " + dataVersion); + } + + TreeMap sortedProps = new TreeMap<>(); + for (Entry entry : conf) + sortedProps.put(entry.getKey(), entry.getValue()); + + for (Entry entry : sortedProps.entrySet()) { + String key = entry.getKey(); + log.info("{} = {}", key, (Property.isSensitive(key) ? "" : entry.getValue())); + Property prop = Property.getPropertyByKey(key); + if (prop != null && conf.isPropertySet(prop, false)) { + if (prop.isDeprecated()) { + Property replacedBy = prop.replacedBy(); + if (replacedBy != null) { + log.warn("{} is deprecated, use {} instead.", prop.getKey(), replacedBy.getKey()); + } else { + log.warn("{} is deprecated", prop.getKey()); + } + } + } + } + + monitorSwappiness(conf); + + // Encourage users to configure TLS + final String SSL = "SSL"; + for (Property sslProtocolProperty : Arrays.asList(Property.RPC_SSL_CLIENT_PROTOCOL, + Property.RPC_SSL_ENABLED_PROTOCOLS, Property.MONITOR_SSL_INCLUDE_PROTOCOLS)) { + String value = conf.get(sslProtocolProperty); + if (value.contains(SSL)) { + log.warn("It is recommended that {} only allow TLS", sslProtocolProperty); + } + } + } + + /** + * Sanity check that the current persistent version is allowed to upgrade to the version of + * Accumulo running. + * + * @param dataVersion + * the version that is persisted in the backing Volumes + */ + public static boolean canUpgradeFromDataVersion(final int dataVersion) { + return ServerConstants.CAN_UPGRADE.get(dataVersion); + } + + /** + * Does the data version number stored in the backing Volumes indicate we need to upgrade + * something? + */ + public static boolean persistentVersionNeedsUpgrade(final int accumuloPersistentVersion) { + return ServerConstants.NEEDS_UPGRADE.get(accumuloPersistentVersion); + } + + public static void monitorSwappiness(AccumuloConfiguration config) { + SimpleTimer.getInstance(config).schedule(() -> { + try { + String procFile = "/proc/sys/vm/swappiness"; + File swappiness = new File(procFile); + if (swappiness.exists() && swappiness.canRead()) { + try (InputStream is = new FileInputStream(procFile)) { + byte[] buffer = new byte[10]; + int bytes = is.read(buffer); + String setting = new String(buffer, 0, bytes, UTF_8); + setting = setting.trim(); + if (bytes > 0 && Integer.parseInt(setting) > 10) { + log.warn("System swappiness setting is greater than ten ({})" + + " which can cause time-sensitive operations to be delayed." + + " Accumulo is time sensitive because it needs to maintain" + + " distributed lock agreement.", setting); + } + } + } + } catch (Throwable t) { + log.error("", t); + } + }, 1000, 10 * 60 * 1000); + } + + public static void waitForZookeeperAndHdfs(ServerContext context) { + log.info("Attempting to talk to zookeeper"); + while (true) { + try { + context.getZooReaderWriter().getChildren(Constants.ZROOT); + break; + } catch (InterruptedException e) { + // ignored + } catch (KeeperException ex) { + log.info("Waiting for accumulo to be initialized"); + sleepUninterruptibly(1, TimeUnit.SECONDS); + } + } + log.info("ZooKeeper connected and initialized, attempting to talk to HDFS"); + long sleep = 1000; + int unknownHostTries = 3; + while (true) { + try { + if (context.getVolumeManager().isReady()) + break; + log.warn("Waiting for the NameNode to leave safemode"); + } catch (IOException ex) { + log.warn("Unable to connect to HDFS", ex); + } catch (IllegalArgumentException exception) { + /* Unwrap the UnknownHostException so we can deal with it directly */ + if (exception.getCause() instanceof UnknownHostException) { + if (unknownHostTries > 0) { + log.warn("Unable to connect to HDFS, will retry. cause: {}", exception.getCause()); + /* + * We need to make sure our sleep period is long enough to avoid getting a cached + * failure of the host lookup. + */ + sleep = Math.max(sleep, + (AddressUtil + .getAddressCacheNegativeTtl((UnknownHostException) (exception.getCause())) + 1) + * 1000); + } else { + log.error("Unable to connect to HDFS and have exceeded the maximum number of retries.", + exception); + throw exception; + } + unknownHostTries--; + } else { + throw exception; + } + } + log.info("Backing off due to failure; current sleep period is {} seconds", sleep / 1000.); + sleepUninterruptibly(sleep, TimeUnit.MILLISECONDS); + /* Back off to give transient failures more time to clear. */ + sleep = Math.min(60 * 1000, sleep * 2); + } + log.info("Connected to HDFS"); + } + + /** + * Exit loudly if there are outstanding Fate operations. Since Fate serializes class names, we + * need to make sure there are no queued transactions from a previous version before continuing an + * upgrade. The status of the operations is irrelevant; those in SUCCESSFUL status cause the same + * problem as those just queued. + * + * Note that the Master should not allow write access to Fate until after all upgrade steps are + * complete. + * + * Should be called as a guard before performing any upgrade steps, after determining that an + * upgrade is needed. + * + * see ACCUMULO-2519 + */ + public static void abortIfFateTransactions(ServerContext context) { + try { + final ReadOnlyTStore fate = new ReadOnlyStore<>(new ZooStore<>( + context.getZooKeeperRoot() + Constants.ZFATE, context.getZooReaderWriter())); + if (!(fate.list().isEmpty())) { + throw new AccumuloException("Aborting upgrade because there are" + + " outstanding FATE transactions from a previous Accumulo version." - + " Please see the README document for instructions on what to do under" - + " your previous version."); ++ + " You can start the tservers and then use the shell to delete completed " ++ + " transactions. If there are uncomplete transactions, you will need to roll" ++ + " back and fix those issues. Please see the Accumulo User manual, " ++ + " Troubleshooting, Upgrade Issues for more information. "); + } + } catch (Exception exception) { + log.error("Problem verifying Fate readiness", exception); + System.exit(1); + } + } +}