Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B4D0D116BD for ; Tue, 22 Apr 2014 21:09:00 +0000 (UTC) Received: (qmail 82573 invoked by uid 500); 22 Apr 2014 21:08:53 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 82135 invoked by uid 500); 22 Apr 2014 21:08:41 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 81863 invoked by uid 99); 22 Apr 2014 21:08:37 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 22 Apr 2014 21:08:37 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id A24638C2D38; Tue, 22 Apr 2014 21:08:36 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: busbey@apache.org To: commits@accumulo.apache.org Date: Tue, 22 Apr 2014 21:08:48 -0000 Message-Id: <7f598f54bc5c464db42dbe1be8f775a0@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [13/15] git commit: ACCUMULO-2654 adds a IT to the functional suite to cover recovery via empty rfile. ACCUMULO-2654 adds a IT to the functional suite to cover recovery via empty rfile. Had to update to use bcfile.Compression directly, since TFile ceased to be. Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/35b0549b Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/35b0549b Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/35b0549b Branch: refs/heads/master Commit: 35b0549ba562334e46944d2a1fbe406bf96349a4 Parents: 1ed463e Author: Sean Busbey Authored: Wed Apr 16 17:01:29 2014 -0500 Committer: Sean Busbey Committed: Tue Apr 22 15:30:23 2014 -0500 ---------------------------------------------------------------------- .../accumulo/core/file/rfile/CreateEmpty.java | 11 +- .../core/file/rfile/bcfile/Compression.java | 2 +- .../accumulo/test/functional/ReadWriteIT.java | 2 +- .../functional/RecoveryWithEmptyRFileIT.java | 144 +++++++++++++++++++ 4 files changed, 153 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/35b0549b/core/src/main/java/org/apache/accumulo/core/file/rfile/CreateEmpty.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/CreateEmpty.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/CreateEmpty.java index 09a2d61..058bb84 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/CreateEmpty.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/CreateEmpty.java @@ -24,11 +24,12 @@ import org.apache.accumulo.core.cli.Help; import org.apache.accumulo.core.conf.DefaultConfiguration; import org.apache.accumulo.core.file.FileSKVWriter; import org.apache.accumulo.core.file.rfile.RFile.Writer; -import org.apache.accumulo.core.file.rfile.bcfile.TFile; +import org.apache.accumulo.core.file.rfile.bcfile.Compression; import org.apache.accumulo.core.util.CachedConfiguration; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.log4j.Logger; import com.beust.jcommander.IParameterValidator; import com.beust.jcommander.Parameter; @@ -38,6 +39,7 @@ import com.beust.jcommander.ParameterException; * Create an empty RFile for use in recovering from data loss where Accumulo still refers internally to a path. */ public class CreateEmpty { + private static final Logger log = Logger.getLogger(CreateEmpty.class); public static class NamedLikeRFile implements IParameterValidator { @Override @@ -51,16 +53,16 @@ public class CreateEmpty { public static class IsSupportedCompressionAlgorithm implements IParameterValidator { @Override public void validate(String name, String value) throws ParameterException { - String[] algorithms = TFile.getSupportedCompressionAlgorithms(); + String[] algorithms = Compression.getSupportedAlgorithms(); if (!((Arrays.asList(algorithms)).contains(value))) { - throw new ParameterException("Compression codec must be one of " + Arrays.toString(TFile.getSupportedCompressionAlgorithms())); + throw new ParameterException("Compression codec must be one of " + Arrays.toString(algorithms)); } } } static class Opts extends Help { @Parameter(names = {"-c", "--codec"}, description = "the compression codec to use.", validateWith = IsSupportedCompressionAlgorithm.class) - String codec = TFile.COMPRESSION_NONE; + String codec = Compression.COMPRESSION_NONE; @Parameter(description = " { ... } Each path given is a URL. Relative paths are resolved according to the default filesystem defined in your Hadoop configuration, which is usually an HDFS instance.", required = true, validateWith = NamedLikeRFile.class) List files = new ArrayList(); } @@ -73,6 +75,7 @@ public class CreateEmpty { for (String arg : opts.files) { Path path = new Path(arg); + log.info("Writing to file '" + path + "'"); FileSKVWriter writer = (new RFileOperations()).openWriter(arg, path.getFileSystem(conf), conf, DefaultConfiguration.getDefaultConfiguration(), opts.codec); writer.close(); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/35b0549b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java index 66ca07f..5288bbb 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java @@ -378,7 +378,7 @@ public final class Compression { throw new IllegalArgumentException("Unsupported compression algorithm name: " + compressName); } - static String[] getSupportedAlgorithms() { + public static String[] getSupportedAlgorithms() { Algorithm[] algos = Algorithm.class.getEnumConstants(); ArrayList ret = new ArrayList(); http://git-wip-us.apache.org/repos/asf/accumulo/blob/35b0549b/test/src/test/java/org/apache/accumulo/test/functional/ReadWriteIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/functional/ReadWriteIT.java b/test/src/test/java/org/apache/accumulo/test/functional/ReadWriteIT.java index e4fe57c..6dbecfa 100644 --- a/test/src/test/java/org/apache/accumulo/test/functional/ReadWriteIT.java +++ b/test/src/test/java/org/apache/accumulo/test/functional/ReadWriteIT.java @@ -100,7 +100,7 @@ public class ReadWriteIT extends ConfigurableMacIT { TestIngest.ingest(connector, opts, new BatchWriterOpts()); } - private static void verify(Connector connector, int rows, int cols, int width, int offset) throws Exception { + public static void verify(Connector connector, int rows, int cols, int width, int offset) throws Exception { verify(connector, rows, cols, width, offset, COLF); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/35b0549b/test/src/test/java/org/apache/accumulo/test/functional/RecoveryWithEmptyRFileIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/functional/RecoveryWithEmptyRFileIT.java b/test/src/test/java/org/apache/accumulo/test/functional/RecoveryWithEmptyRFileIT.java new file mode 100644 index 0000000..abe6950 --- /dev/null +++ b/test/src/test/java/org/apache/accumulo/test/functional/RecoveryWithEmptyRFileIT.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.functional; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.net.URL; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.accumulo.core.cli.BatchWriterOpts; +import org.apache.accumulo.core.cli.ScannerOpts; +import org.apache.accumulo.core.client.BatchScanner; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.admin.TableOperations; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.file.rfile.CreateEmpty; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.monitor.Monitor; +import org.apache.accumulo.server.util.Admin; +import org.apache.accumulo.test.functional.ReadWriteIT; +import org.apache.accumulo.test.TestIngest; +import org.apache.accumulo.test.TestMultiTableIngest; +import org.apache.accumulo.test.VerifyIngest; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.log4j.Logger; +import org.junit.Test; + +/** + XXX As a part of verifying lossy recovery via inserting an empty rfile, + this test deletes test table tablets. This will require write access to + the backing files of the test Accumulo mini cluster. + + This test should read the file location from the test harness and that + file should be on the local filesystem. If you want to take a paranoid + approach just make sure the test user doesn't have write access to the + HDFS files of any colocated live Accumulo instance or any important + local filesystem files.. +*/ +public class RecoveryWithEmptyRFileIT extends ConfigurableMacIT { + private static final Logger log = Logger.getLogger(RecoveryWithEmptyRFileIT.class); + + static final int ROWS = 200000; + static final int COLS = 1; + static final String COLF = "colf"; + + @Override + protected int defaultTimeoutSeconds() { + return 2 * 60; + } + + @Override + public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + cfg.useMiniDFS(true); + } + + @Test + public void replaceMissingRFile() throws Exception { + log.info("Ingest some data, verify it was stored properly, replace an underlying rfile with an empty one and verify we can scan."); + Connector connector = getConnector(); + ReadWriteIT.ingest(connector, ROWS, COLS, 50, 0); + ReadWriteIT.verify(connector, ROWS, COLS, 50, 0); + + connector.tableOperations().flush("test_ingest", null, null, true); + connector.tableOperations().offline("test_ingest", true); + + log.debug("Replacing rfile(s) with empty"); + Scanner meta = connector.createScanner(MetadataTable.NAME, Authorizations.EMPTY); + String tableId = connector.tableOperations().tableIdMap().get("test_ingest"); + meta.setRange(new Range(new Text(tableId + ";"), new Text(tableId + "<"))); + meta.fetchColumnFamily(DataFileColumnFamily.NAME); + boolean foundFile = false; + for (Entry entry : meta) { + foundFile = true; + Path rfile = new Path(entry.getKey().getColumnQualifier().toString()); + log.debug("Removing rfile '" + rfile + "'"); + cluster.getFileSystem().delete(rfile, false); + Process info = cluster.exec(CreateEmpty.class, rfile.toString()); + assertEquals(0, info.waitFor()); + } + meta.close(); + assertTrue(foundFile); + + if(log.isTraceEnabled()) { + log.trace("Enumerating backing filesystem paths:"); + RemoteIterator paths = cluster.getFileSystem().listFiles(new Path("/"), true); + while (paths.hasNext()) { + FileStatus path = paths.next(); + log.trace(path.toString()); + } + } + log.trace("invalidate cached file handles by issuing a compaction"); + connector.tableOperations().online("test_ingest", true); + connector.tableOperations().compact("test_ingest", null, null, false, true); + + log.debug("make sure we can still scan"); + Scanner scan = connector.createScanner("test_ingest", Authorizations.EMPTY); + scan.setRange(new Range()); + long cells = 0l; + for (Entry entry : scan) { + cells++; + } + scan.close(); + assertEquals(0l, cells); + FileSystem.closeAll(); + } + +}