Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id D86BE200C4B for ; Mon, 20 Mar 2017 15:49:01 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id D7088160B81; Mon, 20 Mar 2017 14:49:01 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id BBFAA160B92 for ; Mon, 20 Mar 2017 15:48:59 +0100 (CET) Received: (qmail 33273 invoked by uid 500); 20 Mar 2017 14:48:58 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 33119 invoked by uid 99); 20 Mar 2017 14:48:57 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 20 Mar 2017 14:48:57 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 4E776DFFC2; Mon, 20 Mar 2017 14:48:57 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kturner@apache.org To: commits@accumulo.apache.org Date: Mon, 20 Mar 2017 14:48:59 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [3/9] accumulo git commit: ACCUMULO-4501 ACCUMULO-96 Added Summarization archived-at: Mon, 20 Mar 2017 14:49:02 -0000 http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/util/CancelFlagFuture.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/util/CancelFlagFuture.java b/core/src/main/java/org/apache/accumulo/core/util/CancelFlagFuture.java new file mode 100644 index 0000000..9da91c0 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/util/CancelFlagFuture.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.accumulo.core.util; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * A simple future wrapper that will set an atomic boolean to true if a future is successfully canceled + */ +public class CancelFlagFuture implements Future { + + private Future wrappedFuture; + private AtomicBoolean cancelFlag; + + public CancelFlagFuture(Future wrappedFuture, AtomicBoolean cancelFlag) { + this.wrappedFuture = wrappedFuture; + this.cancelFlag = cancelFlag; + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + boolean ret = wrappedFuture.cancel(mayInterruptIfRunning); + if (ret) { + cancelFlag.set(true); + } + return ret; + } + + @Override + public boolean isCancelled() { + return wrappedFuture.isCancelled(); + } + + @Override + public boolean isDone() { + return wrappedFuture.isDone(); + } + + @Override + public T get() throws InterruptedException, ExecutionException { + return wrappedFuture.get(); + } + + @Override + public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + return wrappedFuture.get(timeout, unit); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/util/CompletableFutureUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/util/CompletableFutureUtil.java b/core/src/main/java/org/apache/accumulo/core/util/CompletableFutureUtil.java new file mode 100644 index 0000000..c417b0f --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/util/CompletableFutureUtil.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.accumulo.core.util; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.function.BiFunction; +import java.util.function.Supplier; + +public class CompletableFutureUtil { + + // create a binary tree of completable future operations, where each node in the tree merges the results of their children when complete + public static CompletableFuture merge(List> futures, BiFunction mergeFunc, Supplier nothing) { + if (futures.size() == 0) { + return CompletableFuture.completedFuture(nothing.get()); + } + while (futures.size() > 1) { + ArrayList> mergedFutures = new ArrayList<>(futures.size() / 2); + for (int i = 0; i < futures.size(); i += 2) { + if (i + 1 == futures.size()) { + mergedFutures.add(futures.get(i)); + } else { + mergedFutures.add(futures.get(i).thenCombine(futures.get(i + 1), mergeFunc)); + } + } + + futures = mergedFutures; + } + + return futures.get(0); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/thrift/data.thrift ---------------------------------------------------------------------- diff --git a/core/src/main/thrift/data.thrift b/core/src/main/thrift/data.thrift index 3e5c56c..648c37d 100644 --- a/core/src/main/thrift/data.thrift +++ b/core/src/main/thrift/data.thrift @@ -149,6 +149,40 @@ struct TConditionalSession { 3:i64 ttl; } +struct TSummarizerConfiguration { + 1:string classname + 2:map options + 3:string configId +} + +struct TSummary { + 1:map summary + 2:TSummarizerConfiguration config + 3:i64 filesContaining + 4:i64 filesExceeding + 5:i64 filesLarge +} + +struct TSummaries { + 1:bool finished + 2:i64 sessionId + 3:i64 totalFiles + 4:i64 deletedFiles + 5:list summaries +} + +struct TRowRange { + 1:binary startRow + 2:binary endRow +} + +struct TSummaryRequest { + 1:string tableId + 2:TRowRange bounds + 3:list summarizers + 4:string summarizerPattern +} + typedef map> CMBatch typedef map> UpdateBatch http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/thrift/tabletserver.thrift ---------------------------------------------------------------------- diff --git a/core/src/main/thrift/tabletserver.thrift b/core/src/main/thrift/tabletserver.thrift index 7697a2d..b56449f 100644 --- a/core/src/main/thrift/tabletserver.thrift +++ b/core/src/main/thrift/tabletserver.thrift @@ -230,6 +230,11 @@ service TabletClientService extends client.ClientService { list getActiveCompactions(2:trace.TInfo tinfo, 1:security.TCredentials credentials) throws (1:client.ThriftSecurityException sec) oneway void removeLogs(1:trace.TInfo tinfo, 2:security.TCredentials credentials, 3:list filenames) list getActiveLogs(1:trace.TInfo tinfo, 2:security.TCredentials credentials) + + data.TSummaries startGetSummaries(1:trace.TInfo tinfo, 2:security.TCredentials credentials, 3:data.TSummaryRequest request) throws (1:client.ThriftSecurityException sec, 2:client.ThriftTableOperationException tope) + data.TSummaries startGetSummariesForPartition(1:trace.TInfo tinfo, 2:security.TCredentials credentials, 3:data.TSummaryRequest request, 4:i32 modulus, 5:i32 remainder) throws (1:client.ThriftSecurityException sec) + data.TSummaries startGetSummariesFromFiles(1:trace.TInfo tinfo, 2:security.TCredentials credentials, 3:data.TSummaryRequest request, 4:map> files) throws (1:client.ThriftSecurityException sec) + data.TSummaries contiuneGetSummaries(1:trace.TInfo tinfo, 2:i64 sessionId) throws (1:NoSuchScanIDException nssi) } typedef i32 TabletID http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/test/java/org/apache/accumulo/core/client/impl/TableOperationsHelperTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/accumulo/core/client/impl/TableOperationsHelperTest.java b/core/src/test/java/org/apache/accumulo/core/client/impl/TableOperationsHelperTest.java index 1d699c2..496cde3 100644 --- a/core/src/test/java/org/apache/accumulo/core/client/impl/TableOperationsHelperTest.java +++ b/core/src/test/java/org/apache/accumulo/core/client/impl/TableOperationsHelperTest.java @@ -27,6 +27,7 @@ import java.util.Map.Entry; import java.util.Set; import java.util.SortedSet; import java.util.TreeMap; +import java.util.function.Predicate; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; @@ -37,8 +38,10 @@ import org.apache.accumulo.core.client.admin.CompactionConfig; import org.apache.accumulo.core.client.admin.DiskUsage; import org.apache.accumulo.core.client.admin.Locations; import org.apache.accumulo.core.client.admin.NewTableConfiguration; +import org.apache.accumulo.core.client.admin.SummaryRetriever; import org.apache.accumulo.core.client.admin.TimeType; import org.apache.accumulo.core.client.sample.SamplerConfiguration; +import org.apache.accumulo.core.client.summary.SummarizerConfiguration; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; import org.apache.accumulo.core.security.Authorizations; @@ -249,6 +252,29 @@ public class TableOperationsHelperTest { public Locations locate(String tableName, Collection ranges) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { throw new UnsupportedOperationException(); } + + @Override + public SummaryRetriever summaries(String tableName) throws TableNotFoundException, AccumuloException, AccumuloSecurityException { + throw new UnsupportedOperationException(); + } + + @Override + public void addSummarizers(String tableName, SummarizerConfiguration... summarizerConf) throws TableNotFoundException, AccumuloException, + AccumuloSecurityException { + throw new UnsupportedOperationException(); + + } + + @Override + public void removeSummarizers(String tableName, Predicate predicate) throws AccumuloException, TableNotFoundException, + AccumuloSecurityException { + throw new UnsupportedOperationException(); + } + + @Override + public List listSummarizers(String tableName) throws AccumuloException, TableNotFoundException, AccumuloSecurityException { + throw new UnsupportedOperationException(); + } } protected TableOperationsHelper getHelper() { http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/test/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormatTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormatTest.java b/core/src/test/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormatTest.java index d85db92..367c833 100644 --- a/core/src/test/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormatTest.java +++ b/core/src/test/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormatTest.java @@ -17,12 +17,18 @@ package org.apache.accumulo.core.client.mapred; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.util.Collection; import org.apache.accumulo.core.client.mapreduce.lib.impl.FileOutputConfigurator; import org.apache.accumulo.core.client.sample.RowSampler; import org.apache.accumulo.core.client.sample.SamplerConfiguration; +import org.apache.accumulo.core.client.summary.CountingSummarizer; +import org.apache.accumulo.core.client.summary.SummarizerConfiguration; +import org.apache.accumulo.core.client.summary.summarizers.FamilySummarizer; +import org.apache.accumulo.core.client.summary.summarizers.VisibilitySummarizer; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl; @@ -43,6 +49,9 @@ public class AccumuloFileOutputFormatTest { samplerConfig.addOption("hasher", "murmur3_32"); samplerConfig.addOption("modulus", "109"); + SummarizerConfiguration sc1 = SummarizerConfiguration.builder(VisibilitySummarizer.class).addOption(CountingSummarizer.MAX_COUNTERS_OPT, 2048).build(); + SummarizerConfiguration sc2 = SummarizerConfiguration.builder(FamilySummarizer.class).addOption(CountingSummarizer.MAX_COUNTERS_OPT, 256).build(); + JobConf job = new JobConf(); AccumuloFileOutputFormat.setReplication(job, a); AccumuloFileOutputFormat.setFileBlockSize(job, b); @@ -50,6 +59,7 @@ public class AccumuloFileOutputFormatTest { AccumuloFileOutputFormat.setIndexBlockSize(job, d); AccumuloFileOutputFormat.setCompressionType(job, e); AccumuloFileOutputFormat.setSampler(job, samplerConfig); + AccumuloFileOutputFormat.setSummarizers(job, sc1, sc2); AccumuloConfiguration acuconf = FileOutputConfigurator.getAccumuloConfiguration(AccumuloFileOutputFormat.class, job); @@ -60,6 +70,11 @@ public class AccumuloFileOutputFormatTest { assertEquals("snappy", acuconf.get(Property.TABLE_FILE_COMPRESSION_TYPE)); assertEquals(new SamplerConfigurationImpl(samplerConfig), SamplerConfigurationImpl.newSamplerConfig(acuconf)); + Collection summarizerConfigs = SummarizerConfiguration.fromTableProperties(acuconf); + assertEquals(2, summarizerConfigs.size()); + assertTrue(summarizerConfigs.contains(sc1)); + assertTrue(summarizerConfigs.contains(sc2)); + a = 17; b = 1300l; c = 150l; @@ -85,5 +100,8 @@ public class AccumuloFileOutputFormatTest { assertEquals(110l, acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX)); assertEquals("lzo", acuconf.get(Property.TABLE_FILE_COMPRESSION_TYPE)); assertEquals(new SamplerConfigurationImpl(samplerConfig), SamplerConfigurationImpl.newSamplerConfig(acuconf)); + + summarizerConfigs = SummarizerConfiguration.fromTableProperties(acuconf); + assertEquals(0, summarizerConfigs.size()); } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java b/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java index 39d226b..5143453 100644 --- a/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java +++ b/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java @@ -17,12 +17,18 @@ package org.apache.accumulo.core.client.mapreduce; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.util.Collection; import org.apache.accumulo.core.client.mapreduce.lib.impl.FileOutputConfigurator; import org.apache.accumulo.core.client.sample.RowSampler; import org.apache.accumulo.core.client.sample.SamplerConfiguration; +import org.apache.accumulo.core.client.summary.CountingSummarizer; +import org.apache.accumulo.core.client.summary.SummarizerConfiguration; +import org.apache.accumulo.core.client.summary.summarizers.FamilySummarizer; +import org.apache.accumulo.core.client.summary.summarizers.VisibilitySummarizer; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl; @@ -43,6 +49,9 @@ public class AccumuloFileOutputFormatTest { samplerConfig.addOption("hasher", "murmur3_32"); samplerConfig.addOption("modulus", "109"); + SummarizerConfiguration sc1 = SummarizerConfiguration.builder(VisibilitySummarizer.class).addOption(CountingSummarizer.MAX_COUNTERS_OPT, 2048).build(); + SummarizerConfiguration sc2 = SummarizerConfiguration.builder(FamilySummarizer.class).addOption(CountingSummarizer.MAX_COUNTERS_OPT, 256).build(); + Job job1 = Job.getInstance(); AccumuloFileOutputFormat.setReplication(job1, a); AccumuloFileOutputFormat.setFileBlockSize(job1, b); @@ -50,6 +59,7 @@ public class AccumuloFileOutputFormatTest { AccumuloFileOutputFormat.setIndexBlockSize(job1, d); AccumuloFileOutputFormat.setCompressionType(job1, e); AccumuloFileOutputFormat.setSampler(job1, samplerConfig); + AccumuloFileOutputFormat.setSummarizers(job1, sc1, sc2); AccumuloConfiguration acuconf = FileOutputConfigurator.getAccumuloConfiguration(AccumuloFileOutputFormat.class, job1.getConfiguration()); @@ -60,6 +70,11 @@ public class AccumuloFileOutputFormatTest { assertEquals("snappy", acuconf.get(Property.TABLE_FILE_COMPRESSION_TYPE)); assertEquals(new SamplerConfigurationImpl(samplerConfig), SamplerConfigurationImpl.newSamplerConfig(acuconf)); + Collection summarizerConfigs = SummarizerConfiguration.fromTableProperties(acuconf); + assertEquals(2, summarizerConfigs.size()); + assertTrue(summarizerConfigs.contains(sc1)); + assertTrue(summarizerConfigs.contains(sc2)); + a = 17; b = 1300l; c = 150l; @@ -86,5 +101,8 @@ public class AccumuloFileOutputFormatTest { assertEquals("lzo", acuconf.get(Property.TABLE_FILE_COMPRESSION_TYPE)); assertEquals(new SamplerConfigurationImpl(samplerConfig), SamplerConfigurationImpl.newSamplerConfig(acuconf)); + summarizerConfigs = SummarizerConfiguration.fromTableProperties(acuconf); + assertEquals(0, summarizerConfigs.size()); + } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileTest.java b/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileTest.java index 4993810..3352b0f 100644 --- a/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileTest.java +++ b/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileTest.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.AbstractMap; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; @@ -36,6 +37,11 @@ import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.admin.NewTableConfiguration; import org.apache.accumulo.core.client.sample.RowSampler; import org.apache.accumulo.core.client.sample.SamplerConfiguration; +import org.apache.accumulo.core.client.summary.CounterSummary; +import org.apache.accumulo.core.client.summary.SummarizerConfiguration; +import org.apache.accumulo.core.client.summary.Summary; +import org.apache.accumulo.core.client.summary.summarizers.FamilySummarizer; +import org.apache.accumulo.core.client.summary.summarizers.VisibilitySummarizer; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.ArrayByteSequence; @@ -83,6 +89,10 @@ public class RFileTest { } private SortedMap createTestData(int startRow, int rows, int startFamily, int families, int qualifiers) { + return createTestData(startRow, rows, startFamily, families, qualifiers, ""); + } + + private SortedMap createTestData(int startRow, int rows, int startFamily, int families, int qualifiers, String... vis) { TreeMap testData = new TreeMap<>(); for (int r = 0; r < rows; r++) { @@ -91,8 +101,10 @@ public class RFileTest { String fam = colStr(f + startFamily); for (int q = 0; q < qualifiers; q++) { String qual = colStr(q); - Key k = new Key(row, fam, qual); - testData.put(k, new Value((k.hashCode() + "").getBytes())); + for (String v : vis) { + Key k = new Key(row, fam, qual, v); + testData.put(k, new Value((k.hashCode() + "").getBytes())); + } } } } @@ -498,6 +510,148 @@ public class RFileTest { scanner.close(); } + @Test + public void testSummaries() throws Exception { + SummarizerConfiguration sc1 = SummarizerConfiguration.builder(VisibilitySummarizer.class).build(); + SummarizerConfiguration sc2 = SummarizerConfiguration.builder(FamilySummarizer.class).build(); + + LocalFileSystem localFs = FileSystem.getLocal(new Configuration()); + String testFile = createTmpTestFile(); + + SortedMap testData1 = createTestData(0, 100, 0, 4, 1, "A&B", "A&B&C"); + + RFileWriter writer = RFile.newWriter().to(testFile).withFileSystem(localFs).withSummarizers(sc1, sc2).build(); + writer.append(testData1.entrySet()); + writer.close(); + + // verify summary data + Collection summaries = RFile.summaries().from(testFile).withFileSystem(localFs).read(); + Assert.assertEquals(2, summaries.size()); + for (Summary summary : summaries) { + Assert.assertEquals(0, summary.getFileStatistics().getInaccurate()); + Assert.assertEquals(1, summary.getFileStatistics().getTotal()); + String className = summary.getSummarizerConfiguration().getClassName(); + CounterSummary counterSummary = new CounterSummary(summary); + if (className.equals(FamilySummarizer.class.getName())) { + Map counters = counterSummary.getCounters(); + Map expected = ImmutableMap.of("0000", 200l, "0001", 200l, "0002", 200l, "0003", 200l); + Assert.assertEquals(expected, counters); + } else if (className.equals(VisibilitySummarizer.class.getName())) { + Map counters = counterSummary.getCounters(); + Map expected = ImmutableMap.of("A&B", 400l, "A&B&C", 400l); + Assert.assertEquals(expected, counters); + } else { + Assert.fail("Unexpected classname " + className); + } + } + + // check if writing summary data impacted normal rfile functionality + Scanner scanner = RFile.newScanner().from(testFile).withFileSystem(localFs).withAuthorizations(new Authorizations("A", "B", "C")).build(); + Assert.assertEquals(testData1, toMap(scanner)); + scanner.close(); + + String testFile2 = createTmpTestFile(); + SortedMap testData2 = createTestData(100, 100, 0, 4, 1, "A&B", "A&B&C"); + writer = RFile.newWriter().to(testFile2).withFileSystem(localFs).withSummarizers(sc1, sc2).build(); + writer.append(testData2.entrySet()); + writer.close(); + + // verify reading summaries from multiple files works + summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs).read(); + Assert.assertEquals(2, summaries.size()); + for (Summary summary : summaries) { + Assert.assertEquals(0, summary.getFileStatistics().getInaccurate()); + Assert.assertEquals(2, summary.getFileStatistics().getTotal()); + String className = summary.getSummarizerConfiguration().getClassName(); + CounterSummary counterSummary = new CounterSummary(summary); + if (className.equals(FamilySummarizer.class.getName())) { + Map counters = counterSummary.getCounters(); + Map expected = ImmutableMap.of("0000", 400l, "0001", 400l, "0002", 400l, "0003", 400l); + Assert.assertEquals(expected, counters); + } else if (className.equals(VisibilitySummarizer.class.getName())) { + Map counters = counterSummary.getCounters(); + Map expected = ImmutableMap.of("A&B", 800l, "A&B&C", 800l); + Assert.assertEquals(expected, counters); + } else { + Assert.fail("Unexpected classname " + className); + } + } + + // verify reading a subset of summaries works + summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs).selectSummaries(sc -> sc.equals(sc1)).read(); + checkSummaries(summaries, ImmutableMap.of("A&B", 800l, "A&B&C", 800l), 0); + + // the following test check boundry conditions for start row and end row + summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs).selectSummaries(sc -> sc.equals(sc1)).startRow(rowStr(99)).read(); + checkSummaries(summaries, ImmutableMap.of("A&B", 400l, "A&B&C", 400l), 0); + + summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs).selectSummaries(sc -> sc.equals(sc1)).startRow(rowStr(98)).read(); + checkSummaries(summaries, ImmutableMap.of("A&B", 800l, "A&B&C", 800l), 1); + + summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs).selectSummaries(sc -> sc.equals(sc1)).startRow(rowStr(0)).read(); + checkSummaries(summaries, ImmutableMap.of("A&B", 800l, "A&B&C", 800l), 1); + + summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs).selectSummaries(sc -> sc.equals(sc1)).startRow("#").read(); + checkSummaries(summaries, ImmutableMap.of("A&B", 800l, "A&B&C", 800l), 0); + + summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs).selectSummaries(sc -> sc.equals(sc1)).startRow(rowStr(100)).read(); + checkSummaries(summaries, ImmutableMap.of("A&B", 400l, "A&B&C", 400l), 1); + + summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs).selectSummaries(sc -> sc.equals(sc1)).endRow(rowStr(99)).read(); + checkSummaries(summaries, ImmutableMap.of("A&B", 400l, "A&B&C", 400l), 0); + + summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs).selectSummaries(sc -> sc.equals(sc1)).endRow(rowStr(100)).read(); + checkSummaries(summaries, ImmutableMap.of("A&B", 800l, "A&B&C", 800l), 1); + + summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs).selectSummaries(sc -> sc.equals(sc1)).endRow(rowStr(199)).read(); + checkSummaries(summaries, ImmutableMap.of("A&B", 800l, "A&B&C", 800l), 0); + + summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs).selectSummaries(sc -> sc.equals(sc1)).startRow(rowStr(50)) + .endRow(rowStr(150)).read(); + checkSummaries(summaries, ImmutableMap.of("A&B", 800l, "A&B&C", 800l), 2); + + summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs).selectSummaries(sc -> sc.equals(sc1)).startRow(rowStr(120)) + .endRow(rowStr(150)).read(); + checkSummaries(summaries, ImmutableMap.of("A&B", 400l, "A&B&C", 400l), 1); + + summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs).selectSummaries(sc -> sc.equals(sc1)).startRow(rowStr(50)) + .endRow(rowStr(199)).read(); + checkSummaries(summaries, ImmutableMap.of("A&B", 800l, "A&B&C", 800l), 1); + + summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs).selectSummaries(sc -> sc.equals(sc1)).startRow("#").endRow(rowStr(150)) + .read(); + checkSummaries(summaries, ImmutableMap.of("A&B", 800l, "A&B&C", 800l), 1); + + summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs).selectSummaries(sc -> sc.equals(sc1)).startRow(rowStr(199)).read(); + checkSummaries(summaries, ImmutableMap.of(), 0); + summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs).selectSummaries(sc -> sc.equals(sc1)).startRow(rowStr(200)).read(); + checkSummaries(summaries, ImmutableMap.of(), 0); + + summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs).selectSummaries(sc -> sc.equals(sc1)).endRow("#").read(); + checkSummaries(summaries, ImmutableMap.of(), 0); + + summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs).selectSummaries(sc -> sc.equals(sc1)).endRow(rowStr(0)).read(); + checkSummaries(summaries, ImmutableMap.of("A&B", 400l, "A&B&C", 400l), 1); + } + + private void checkSummaries(Collection summaries, Map expected, int extra) { + Assert.assertEquals(1, summaries.size()); + for (Summary summary : summaries) { + Assert.assertEquals(extra, summary.getFileStatistics().getInaccurate()); + Assert.assertEquals(extra, summary.getFileStatistics().getExtra()); + Assert.assertEquals(2, summary.getFileStatistics().getTotal()); + String className = summary.getSummarizerConfiguration().getClassName(); + CounterSummary counterSummary = new CounterSummary(summary); + if (className.equals(VisibilitySummarizer.class.getName())) { + Map counters = counterSummary.getCounters(); + + Assert.assertEquals(expected, counters); + } else { + Assert.fail("Unexpected classname " + className); + } + } + } + @Test(expected = IllegalArgumentException.class) public void testOutOfOrder() throws Exception { // test that exception declared in API is thrown http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/test/java/org/apache/accumulo/core/client/summary/CountingSummarizerTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/accumulo/core/client/summary/CountingSummarizerTest.java b/core/src/test/java/org/apache/accumulo/core/client/summary/CountingSummarizerTest.java new file mode 100644 index 0000000..06f8d35 --- /dev/null +++ b/core/src/test/java/org/apache/accumulo/core/client/summary/CountingSummarizerTest.java @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.accumulo.core.client.summary; + +import static org.apache.accumulo.core.client.summary.CountingSummarizer.COUNTER_STAT_PREFIX; +import static org.apache.accumulo.core.client.summary.CountingSummarizer.DELETES_IGNORED_STAT; +import static org.apache.accumulo.core.client.summary.CountingSummarizer.EMITTED_STAT; +import static org.apache.accumulo.core.client.summary.CountingSummarizer.INGNORE_DELETES_OPT; +import static org.apache.accumulo.core.client.summary.CountingSummarizer.MAX_COUNTERS_OPT; +import static org.apache.accumulo.core.client.summary.CountingSummarizer.MAX_COUNTER_LEN_OPT; +import static org.apache.accumulo.core.client.summary.CountingSummarizer.SEEN_STAT; +import static org.apache.accumulo.core.client.summary.CountingSummarizer.TOO_LONG_STAT; +import static org.apache.accumulo.core.client.summary.CountingSummarizer.TOO_MANY_STAT; + +import java.util.Arrays; +import java.util.HashMap; + +import org.apache.accumulo.core.client.summary.CounterSummary; +import org.apache.accumulo.core.client.summary.CountingSummarizer; +import org.apache.accumulo.core.client.summary.Summarizer; +import org.apache.accumulo.core.client.summary.Summarizer.Collector; +import org.apache.accumulo.core.client.summary.SummarizerConfiguration; +import org.apache.accumulo.core.client.summary.summarizers.FamilySummarizer; +import org.apache.accumulo.core.client.summary.summarizers.VisibilitySummarizer; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.junit.Assert; +import org.junit.Test; + +public class CountingSummarizerTest { + + public static class MultiSummarizer extends CountingSummarizer { + @Override + protected Converter converter() { + return (k, v, c) -> { + c.accept("rp:" + k.getRowData().subSequence(0, 2).toString()); + c.accept("fp:" + k.getColumnFamilyData().subSequence(0, 2).toString()); + c.accept("qp:" + k.getColumnQualifierData().subSequence(0, 2).toString()); + }; + } + } + + @Test + public void testMultipleEmit() { + SummarizerConfiguration sc = SummarizerConfiguration.builder(MultiSummarizer.class).build(); + MultiSummarizer countSum = new MultiSummarizer(); + + Summarizer.Collector collector = countSum.collector(sc); + + Value val = new Value("abc"); + + HashMap expected = new HashMap<>(); + + for (String row : new String[] {"ask", "asleep", "some", "soul"}) { + for (String fam : new String[] {"hop", "hope", "nope", "noop"}) { + for (String qual : new String[] {"mad", "lad", "lab", "map"}) { + collector.accept(new Key(row, fam, qual), val); + + expected.merge("rp:" + row.substring(0, 2), 1l, Long::sum); + expected.merge("fp:" + fam.substring(0, 2), 1l, Long::sum); + expected.merge("qp:" + qual.substring(0, 2), 1l, Long::sum); + } + } + } + + HashMap stats = new HashMap<>(); + collector.summarize((k, v) -> stats.put(k, v)); + + CounterSummary csum = new CounterSummary(stats); + Assert.assertEquals(expected, csum.getCounters()); + Assert.assertEquals(64, csum.getSeen()); + Assert.assertEquals(3 * 64, csum.getEmitted()); + Assert.assertEquals(0, csum.getIgnored()); + Assert.assertEquals(0, csum.getDeletesIgnored()); + } + + @Test + public void testSummarizing() { + SummarizerConfiguration sc = SummarizerConfiguration.builder(FamilySummarizer.class).addOptions(MAX_COUNTERS_OPT, "5", MAX_COUNTER_LEN_OPT, "10").build(); + FamilySummarizer countSum = new FamilySummarizer(); + + Value val = new Value("abc"); + + Summarizer.Collector collector = countSum.collector(sc); + for (String fam : Arrays.asList("f1", "f1", "f1", "f2", "f1", "f70000000000000000000", "f70000000000000000001", "f2", "f3", "f4", "f5", "f6", "f7", "f3", + "f7")) { + collector.accept(new Key("r", fam), val); + } + + Key dk = new Key("r", "f2"); + dk.setDeleted(true); + collector.accept(dk, new Value("")); + + HashMap stats = new HashMap<>(); + collector.summarize((k, v) -> stats.put(k, v)); + + String p = COUNTER_STAT_PREFIX; + + HashMap expected = new HashMap<>(); + expected.put(p + "f1", 4l); + expected.put(p + "f2", 2l); + expected.put(p + "f3", 2l); + expected.put(p + "f4", 1l); + expected.put(p + "f5", 1l); + expected.put(TOO_LONG_STAT, 2l); + expected.put(TOO_MANY_STAT, 3l); + expected.put(SEEN_STAT, 16l); + expected.put(EMITTED_STAT, 15l); + expected.put(DELETES_IGNORED_STAT, 1l); + + Assert.assertEquals(expected, stats); + + CounterSummary csum = new CounterSummary(stats); + Assert.assertEquals(5, csum.getIgnored()); + Assert.assertEquals(3, csum.getTooMany()); + Assert.assertEquals(2, csum.getTooLong()); + Assert.assertEquals(16, csum.getSeen()); + Assert.assertEquals(15, csum.getEmitted()); + Assert.assertEquals(1, csum.getDeletesIgnored()); + + expected.clear(); + expected.put("f1", 4l); + expected.put("f2", 2l); + expected.put("f3", 2l); + expected.put("f4", 1l); + expected.put("f5", 1l); + Assert.assertEquals(expected, csum.getCounters()); + + } + + @Test + public void testMerge() { + SummarizerConfiguration sc = SummarizerConfiguration.builder(VisibilitySummarizer.class).addOption(MAX_COUNTERS_OPT, "5").build(); + VisibilitySummarizer countSum = new VisibilitySummarizer(); + + String p = COUNTER_STAT_PREFIX; + + HashMap sm1 = new HashMap<>(); + sm1.put(p + "f001", 9l); + sm1.put(p + "f002", 4l); + sm1.put(p + "f003", 2l); + sm1.put(p + "f004", 1l); + sm1.put(p + "f005", 19l); + sm1.put(EMITTED_STAT, 15l); + sm1.put(SEEN_STAT, 5l); + sm1.put(DELETES_IGNORED_STAT, 1l); + + HashMap sm2 = new HashMap<>(); + sm2.put(p + "f001", 1l); + sm2.put(p + "f002", 2l); + sm2.put(p + "f00a", 7l); + sm2.put(p + "f00b", 1l); + sm2.put(p + "f00c", 17l); + sm2.put(EMITTED_STAT, 18l); + sm2.put(SEEN_STAT, 6l); + sm2.put(DELETES_IGNORED_STAT, 2l); + + countSum.combiner(sc).merge(sm1, sm2); + + HashMap expected = new HashMap<>(); + expected.put(p + "f001", 10l); + expected.put(p + "f002", 6l); + expected.put(p + "f005", 19l); + expected.put(p + "f00a", 7l); + expected.put(p + "f00c", 17l); + expected.put(TOO_LONG_STAT, 0l); + expected.put(TOO_MANY_STAT, 4l); + expected.put(EMITTED_STAT, 18l + 15l); + expected.put(SEEN_STAT, 6l + 5l); + expected.put(DELETES_IGNORED_STAT, 3l); + + Assert.assertEquals(expected, sm1); + + sm2.clear(); + sm2.put(p + "f001", 19l); + sm2.put(p + "f002", 2l); + sm2.put(p + "f003", 3l); + sm2.put(p + "f00b", 13l); + sm2.put(p + "f00c", 2l); + sm2.put(TOO_LONG_STAT, 1l); + sm2.put(TOO_MANY_STAT, 3l); + sm2.put(EMITTED_STAT, 21l); + sm2.put(SEEN_STAT, 7l); + sm2.put(DELETES_IGNORED_STAT, 5l); + + countSum.combiner(sc).merge(sm1, sm2); + + expected.clear(); + expected.put(p + "f001", 29l); + expected.put(p + "f002", 8l); + expected.put(p + "f005", 19l); + expected.put(p + "f00b", 13l); + expected.put(p + "f00c", 19l); + expected.put(TOO_LONG_STAT, 1l); + expected.put(TOO_MANY_STAT, 17l); + expected.put(EMITTED_STAT, 21l + 18 + 15); + expected.put(SEEN_STAT, 7l + 6 + 5); + expected.put(DELETES_IGNORED_STAT, 8l); + } + + @Test + public void testCountDeletes() { + SummarizerConfiguration sc = SummarizerConfiguration.builder(FamilySummarizer.class).addOptions(INGNORE_DELETES_OPT, "false").build(); + FamilySummarizer countSum = new FamilySummarizer(); + + Key k1 = new Key("r1", "f1"); + Key k2 = new Key("r1", "f1"); + k2.setDeleted(true); + Key k3 = new Key("r1", "f2"); + + Collector collector = countSum.collector(sc); + collector.accept(k1, new Value("")); + collector.accept(k2, new Value("")); + collector.accept(k3, new Value("")); + + String p = COUNTER_STAT_PREFIX; + + HashMap expected = new HashMap<>(); + expected.put(p + "f1", 2l); + expected.put(p + "f2", 1l); + expected.put(TOO_LONG_STAT, 0l); + expected.put(TOO_MANY_STAT, 0l); + expected.put(SEEN_STAT, 3l); + expected.put(EMITTED_STAT, 3l); + expected.put(DELETES_IGNORED_STAT, 0l); + + HashMap stats = new HashMap<>(); + collector.summarize(stats::put); + Assert.assertEquals(expected, stats); + + CounterSummary csum = new CounterSummary(stats); + Assert.assertEquals(0, csum.getIgnored()); + Assert.assertEquals(0, csum.getTooMany()); + Assert.assertEquals(0, csum.getTooLong()); + Assert.assertEquals(3, csum.getSeen()); + Assert.assertEquals(3, csum.getEmitted()); + Assert.assertEquals(0, csum.getDeletesIgnored()); + + expected.clear(); + expected.put("f1", 2l); + expected.put("f2", 1l); + Assert.assertEquals(expected, csum.getCounters()); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/test/java/org/apache/accumulo/core/summary/SummaryCollectionTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/accumulo/core/summary/SummaryCollectionTest.java b/core/src/test/java/org/apache/accumulo/core/summary/SummaryCollectionTest.java new file mode 100644 index 0000000..95702d3 --- /dev/null +++ b/core/src/test/java/org/apache/accumulo/core/summary/SummaryCollectionTest.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.accumulo.core.summary; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; + +import org.apache.accumulo.core.client.summary.SummarizerConfiguration; +import org.apache.accumulo.core.client.summary.Summary; +import org.apache.accumulo.core.client.summary.Summary.FileStatistics; +import org.apache.accumulo.core.client.summary.summarizers.FamilySummarizer; +import org.apache.accumulo.core.summary.SummaryCollection.FileSummary; +import org.junit.Assert; +import org.junit.Test; + +public class SummaryCollectionTest { + @Test + public void testDeleted() { + SummarizerConfiguration conf = SummarizerConfiguration.builder(FamilySummarizer.class).build(); + + HashMap stats = new HashMap(); + stats.put("c:foo", 9L); + FileSummary fs1 = new FileSummary(conf, stats, false); + SummaryCollection sc1 = new SummaryCollection(Collections.singleton(fs1)); + + stats = new HashMap(); + stats.put("c:foo", 5L); + stats.put("c:bar", 3L); + FileSummary fs2 = new FileSummary(conf, stats, true); + SummaryCollection sc2 = new SummaryCollection(Collections.singleton(fs2)); + + SummaryCollection sc3 = new SummaryCollection(Collections.emptyList()); + + SummaryCollection sc4 = new SummaryCollection(Collections.emptyList(), true); + + SummarizerFactory factory = new SummarizerFactory(); + SummaryCollection mergeSc = new SummaryCollection(); + for (SummaryCollection sc : Arrays.asList(sc1, sc2, sc3, sc4, sc4)) { + mergeSc.merge(sc, factory); + } + + for (SummaryCollection sc : Arrays.asList(mergeSc, new SummaryCollection(mergeSc.toThrift()))) { + List summaries = sc.getSummaries(); + Assert.assertEquals(1, summaries.size()); + Summary summary = summaries.get(0); + FileStatistics filestats = summary.getFileStatistics(); + Assert.assertEquals(5, filestats.getTotal()); + Assert.assertEquals(1, filestats.getExtra()); + Assert.assertEquals(0, filestats.getLarge()); + Assert.assertEquals(1, filestats.getMissing()); + Assert.assertEquals(2, filestats.getDeleted()); + Assert.assertEquals(4, filestats.getInaccurate()); + } + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/test/java/org/apache/accumulo/core/util/CompletableFutureUtilTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/accumulo/core/util/CompletableFutureUtilTest.java b/core/src/test/java/org/apache/accumulo/core/util/CompletableFutureUtilTest.java new file mode 100644 index 0000000..9de580d --- /dev/null +++ b/core/src/test/java/org/apache/accumulo/core/util/CompletableFutureUtilTest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.accumulo.core.util; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.junit.Assert; +import org.junit.Test; + +public class CompletableFutureUtilTest { + @Test + public void testMerge() throws Exception { + ExecutorService es = Executors.newFixedThreadPool(3); + try { + for (int n : new int[] {1, 2, 3, 997, 1000}) { + List> futures = new ArrayList<>(); + for (int i = 1; i <= n; i++) { + final int num = i; + futures.add(CompletableFuture.supplyAsync(() -> num, es)); + } + + CompletableFuture mergedFutures = CompletableFutureUtil.merge(futures, Integer::sum, () -> 0); + Assert.assertEquals(n * (n + 1) / 2, mergedFutures.get().intValue()); + } + + // test zero + CompletableFuture mergedFutures = CompletableFutureUtil.merge(Collections.emptyList(), Integer::sum, () -> 0); + Assert.assertEquals(0, mergedFutures.get().intValue()); + } finally { + es.shutdown(); + } + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/docs/src/main/asciidoc/accumulo_user_manual.asciidoc ---------------------------------------------------------------------- diff --git a/docs/src/main/asciidoc/accumulo_user_manual.asciidoc b/docs/src/main/asciidoc/accumulo_user_manual.asciidoc index fe5403c..979d943 100644 --- a/docs/src/main/asciidoc/accumulo_user_manual.asciidoc +++ b/docs/src/main/asciidoc/accumulo_user_manual.asciidoc @@ -63,6 +63,8 @@ include::chapters/kerberos.txt[] include::chapters/sampling.txt[] +include::chapters/summaries.txt[] + include::chapters/administration.txt[] include::chapters/multivolume.txt[] http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/docs/src/main/asciidoc/chapters/summaries.txt ---------------------------------------------------------------------- diff --git a/docs/src/main/asciidoc/chapters/summaries.txt b/docs/src/main/asciidoc/chapters/summaries.txt new file mode 100644 index 0000000..08d8011 --- /dev/null +++ b/docs/src/main/asciidoc/chapters/summaries.txt @@ -0,0 +1,232 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +== Summary Statistics + +=== Overview + +Accumulo has the ability to generate summary statistics about data in a table +using user defined functions. Currently these statistics are only generated for +data written to files. Data recently written to Accumulo that is still in +memory will not contribute to summary statistics. + +This feature can be used to inform a user about what data is in their table. +Summary statistics can also be used by compaction strategies to make decisions +about which files to compact. + +Summary data is stored in each file Accumulo produces. Accumulo can gather +summary information from across a cluster merging it along the way. In order +for this to be fast the, summary information should fit in cache. There is a +dedicated cache for summary data on each tserver with a configurable size. In +order for summary data to fit in cache, it should probably be small. + +For information on writing a custom summarizer see the javadoc for ++org.apache.accumulo.core.client.summary.Summarizer+. The package ++org.apache.accumulo.core.client.summary.summarizers+ contains summarizer +implementations that ship with Accumulo and can be configured for use. + +=== Inaccuracies + +Summary data can be inaccurate when files are missing summary data or when +files have extra summary data. Files can contain data outside of a tablets +boundaries. This can happen as result of bulk imported files and tablet splits. +When this happens, those files could contain extra summary information. +Accumulo offsets this some by storing summary information for multiple row +ranges per a file. However, the ranges are not granular enough to completely +offset extra data. + +Any source of inaccuracies is reported when summary information is requested. +In the shell examples below this can be seen on the +File Statistics+ line. +For files missing summary information, the compact command in the shell has a ++--sf-no-summary+ option. This options compacts files that do not have the +summary information configured for the table. The compact command also has the ++--sf-extra-summary+ option which will compact files with extra summary +information. + +=== Configuring + +The following tablet server and table properties configure summarization. + +* <> +* <> +* <> +* <> +* <> +* <> + +=== Permissions + +Because summary data may be derived from sensitive data, requesting summary data +requires a special permission. User must have the table permission ++GET_SUMMARIES+ in order to retrieve summary data. + + +=== Bulk import + +When generating rfiles to bulk import into Accumulo, those rfiles can contain +summary data. To use this feature, look at the javadoc on the ++AccumuloFileOutputFormat.setSummarizers(...)+ method. Also, ++org.apache.accumulo.core.client.rfile.RFile+ has options for creating RFiles +with embedded summary data. + +=== Examples + +This example walks through using summarizers in the Accumulo shell. Below a +table is created and some data is inserted to summarize. + + root@uno> createtable summary_test + root@uno summary_test> setauths -u root -s PI,GEO,TIME + root@uno summary_test> insert 3b503bd name last Doe + root@uno summary_test> insert 3b503bd name first John + root@uno summary_test> insert 3b503bd contact address "123 Park Ave, NY, NY" -l PI&GEO + root@uno summary_test> insert 3b503bd date birth "1/11/1942" -l PI&TIME + root@uno summary_test> insert 3b503bd date married "5/11/1962" -l PI&TIME + root@uno summary_test> insert 3b503bd contact home_phone 1-123-456-7890 -l PI + root@uno summary_test> insert d5d18dd contact address "50 Lake Shore Dr, Chicago, IL" -l PI&GEO + root@uno summary_test> insert d5d18dd name first Jane + root@uno summary_test> insert d5d18dd name last Doe + root@uno summary_test> insert d5d18dd date birth 8/15/1969 -l PI&TIME + root@uno summary_test> scan -s PI,GEO,TIME + 3b503bd contact:address [PI&GEO] 123 Park Ave, NY, NY + 3b503bd contact:home_phone [PI] 1-123-456-7890 + 3b503bd date:birth [PI&TIME] 1/11/1942 + 3b503bd date:married [PI&TIME] 5/11/1962 + 3b503bd name:first [] John + 3b503bd name:last [] Doe + d5d18dd contact:address [PI&GEO] 50 Lake Shore Dr, Chicago, IL + d5d18dd date:birth [PI&TIME] 8/15/1969 + d5d18dd name:first [] Jane + d5d18dd name:last [] Doe + +After inserting the data, summaries are requested below. No summaries are returned. + + root@uno summary_test> summaries + +The visibility summarizer is configured below and the table is flushed. +Flushing the table creates a file creating summary data in the process. The +summary data returned counts how many times each column visibility occurred. +The statistics with a +c:+ prefix are visibilities. The others are generic +statistics created by the CountingSummarizer that VisibilitySummarizer extends. + + root@uno summary_test> config -t summary_test -s table.summarizer.vis=org.apache.accumulo.core.client.summary.summarizers.VisibilitySummarizer + root@uno summary_test> summaries + root@uno summary_test> flush -w + 2017-02-24 19:54:46,090 [shell.Shell] INFO : Flush of table summary_test completed. + root@uno summary_test> summaries + Summarizer : org.apache.accumulo.core.client.summary.summarizers.VisibilitySummarizer vis {} + File Statistics : [total:1, missing:0, extra:0, large:0] + Summary Statistics : + c: = 4 + c:PI = 1 + c:PI&GEO = 2 + c:PI&TIME = 3 + emitted = 10 + seen = 10 + tooLong = 0 + tooMany = 0 + +VisibilitySummarizer has an option +maxCounters+ that determines the max number +of column visibilites it will track. Below this option is set and compaction +is forced to regenerate summary data. The new summary data only has three +visibilites and now the +tooMany+ statistic is 4. This is the number of +visibilites that were not counted. + + root@uno summary_test> config -t summary_test -s table.summarizer.vis.opt.maxCounters=3 + root@uno summary_test> compact -w + 2017-02-24 19:54:46,267 [shell.Shell] INFO : Compacting table ... + 2017-02-24 19:54:47,127 [shell.Shell] INFO : Compaction of table summary_test completed for given range + root@uno summary_test> summaries + Summarizer : org.apache.accumulo.core.client.summary.summarizers.VisibilitySummarizer vis {maxCounters=3} + File Statistics : [total:1, missing:0, extra:0, large:0] + Summary Statistics : + c:PI = 1 + c:PI&GEO = 2 + c:PI&TIME = 3 + emitted = 10 + seen = 10 + tooLong = 0 + tooMany = 4 + +Another summarizer is configured below that tracks the number of deletes. Also +a compaction strategy that uses this summary data is configured. The ++TooManyDeletesCompactionStrategy+ will force a compaction of the tablet when +the ratio of deletes to non-deletes is over 25%. This threshold is +configurable. Below a delete is added and its reflected in the statistics. In +this case there is 1 delete and 10 non-deletes, not enough to force a +compaction of the tablet. + +.... +root@uno summary_test> config -t summary_test -s table.summarizer.del=org.apache.accumulo.core.client.summary.summarizers.DeletesSummarizer +root@uno summary_test> compact -w +2017-02-24 19:54:47,282 [shell.Shell] INFO : Compacting table ... +2017-02-24 19:54:49,236 [shell.Shell] INFO : Compaction of table summary_test completed for given range +root@uno summary_test> config -t summary_test -s table.compaction.major.ratio=10 +root@uno summary_test> config -t summary_test -s table.majc.compaction.strategy=org.apache.accumulo.tserver.compaction.strategies.TooManyDeletesCompactionStrategy +root@uno summary_test> deletemany -r d5d18dd -c date -f +[DELETED] d5d18dd date:birth [PI&TIME] +root@uno summary_test> flush -w +2017-02-24 19:54:49,686 [shell.Shell] INFO : Flush of table summary_test completed. +root@uno summary_test> summaries + Summarizer : org.apache.accumulo.core.client.summary.summarizers.VisibilitySummarizer vis {maxCounters=3} + File Statistics : [total:2, missing:0, extra:0, large:0] + Summary Statistics : + c:PI = 1 + c:PI&GEO = 2 + c:PI&TIME = 4 + emitted = 11 + seen = 11 + tooLong = 0 + tooMany = 4 + + Summarizer : org.apache.accumulo.core.client.summary.summarizers.DeletesSummarizer del {} + File Statistics : [total:2, missing:0, extra:0, large:0] + Summary Statistics : + deletes = 1 + total = 11 +.... + +Some more deletes are added and the table is flushed below. This results in 4 +deletes and 10 non-deletes, which triggers a full compaction. A full +compaction of all files is the only time when delete markers are dropped. The +compaction ratio was set to 10 above to show that the number of files did not +trigger the compaction. After the compaction there no deletes 6 non-deletes. + +.... +root@uno summary_test> deletemany -r d5d18dd -f +[DELETED] d5d18dd contact:address [PI&GEO] +[DELETED] d5d18dd name:first [] +[DELETED] d5d18dd name:last [] +root@uno summary_test> flush -w +2017-02-24 19:54:52,800 [shell.Shell] INFO : Flush of table summary_test completed. +root@uno summary_test> summaries + Summarizer : org.apache.accumulo.core.client.summary.summarizers.VisibilitySummarizer vis {maxCounters=3} + File Statistics : [total:1, missing:0, extra:0, large:0] + Summary Statistics : + c:PI = 1 + c:PI&GEO = 1 + c:PI&TIME = 2 + emitted = 6 + seen = 6 + tooLong = 0 + tooMany = 2 + + Summarizer : org.apache.accumulo.core.client.summary.summarizers.DeletesSummarizer del {} + File Statistics : [total:1, missing:0, extra:0, large:0] + Summary Statistics : + deletes = 0 + total = 6 +root@uno summary_test> +.... + http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/minicluster/src/main/java/org/apache/accumulo/cluster/standalone/StandaloneAccumuloCluster.java ---------------------------------------------------------------------- diff --git a/minicluster/src/main/java/org/apache/accumulo/cluster/standalone/StandaloneAccumuloCluster.java b/minicluster/src/main/java/org/apache/accumulo/cluster/standalone/StandaloneAccumuloCluster.java index 3b19aeb..1c670bc 100644 --- a/minicluster/src/main/java/org/apache/accumulo/cluster/standalone/StandaloneAccumuloCluster.java +++ b/minicluster/src/main/java/org/apache/accumulo/cluster/standalone/StandaloneAccumuloCluster.java @@ -142,8 +142,7 @@ public class StandaloneAccumuloCluster implements AccumuloCluster { @Override public StandaloneClusterControl getClusterControl() { - return new StandaloneClusterControl(accumuloHome, clientAccumuloConfDir, serverAccumuloConfDir, - clientCmdPrefix, serverCmdPrefix); + return new StandaloneClusterControl(accumuloHome, clientAccumuloConfDir, serverAccumuloConfDir, clientCmdPrefix, serverCmdPrefix); } @Override http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/minicluster/src/main/java/org/apache/accumulo/cluster/standalone/StandaloneClusterControl.java ---------------------------------------------------------------------- diff --git a/minicluster/src/main/java/org/apache/accumulo/cluster/standalone/StandaloneClusterControl.java b/minicluster/src/main/java/org/apache/accumulo/cluster/standalone/StandaloneClusterControl.java index bf1ccc7..45028e9 100644 --- a/minicluster/src/main/java/org/apache/accumulo/cluster/standalone/StandaloneClusterControl.java +++ b/minicluster/src/main/java/org/apache/accumulo/cluster/standalone/StandaloneClusterControl.java @@ -63,7 +63,8 @@ public class StandaloneClusterControl implements ClusterControl { protected String accumuloServicePath, accumuloPath, accumuloUtilPath; - public StandaloneClusterControl(String accumuloHome, String clientAccumuloConfDir, String serverAccumuloConfDir, String clientCmdPrefix, String serverCmdPrefix) { + public StandaloneClusterControl(String accumuloHome, String clientAccumuloConfDir, String serverAccumuloConfDir, String clientCmdPrefix, + String serverCmdPrefix) { this.options = new RemoteShellOptions(); this.accumuloHome = accumuloHome; this.clientAccumuloConfDir = clientAccumuloConfDir; http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloConfigImpl.java ---------------------------------------------------------------------- diff --git a/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloConfigImpl.java b/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloConfigImpl.java index 20763bb..2693c05 100644 --- a/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloConfigImpl.java +++ b/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloConfigImpl.java @@ -139,7 +139,8 @@ public class MiniAccumuloConfigImpl { mergeProp(Property.TSERV_PORTSEARCH.getKey(), "true"); mergeProp(Property.TSERV_DATACACHE_SIZE.getKey(), "10M"); mergeProp(Property.TSERV_INDEXCACHE_SIZE.getKey(), "10M"); - mergeProp(Property.TSERV_MAXMEM.getKey(), "50M"); + mergeProp(Property.TSERV_SUMMARYCACHE_SIZE.getKey(), "10M"); + mergeProp(Property.TSERV_MAXMEM.getKey(), "40M"); mergeProp(Property.TSERV_WALOG_MAX_SIZE.getKey(), "100M"); mergeProp(Property.TSERV_NATIVEMAP_ENABLED.getKey(), "false"); // since there is a small amount of memory, check more frequently for majc... setting may not be needed in 1.5 http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/server/base/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java b/server/base/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java index c4edc96..fd7746a 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java +++ b/server/base/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java @@ -845,4 +845,9 @@ public class SecurityOperation { authenticate(credentials); return hasSystemPermission(credentials, SystemPermission.OBTAIN_DELEGATION_TOKEN, false); } + + public boolean canGetSummaries(TCredentials credentials, String tableId, String namespaceId) throws ThriftSecurityException { + authenticate(credentials); + return hasTablePermission(credentials, tableId, namespaceId, TablePermission.GET_SUMMARIES, false); + } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 47a0d18..7187e3d 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -50,6 +50,8 @@ import java.util.concurrent.BlockingDeque; import java.util.concurrent.CancellationException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -76,7 +78,9 @@ import org.apache.accumulo.core.client.impl.Translator.TKeyExtentTranslator; import org.apache.accumulo.core.client.impl.Translator.TRangeTranslator; import org.apache.accumulo.core.client.impl.Translators; import org.apache.accumulo.core.client.impl.thrift.SecurityErrorCode; +import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType; import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException; +import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.conf.SiteConfiguration; @@ -102,7 +106,11 @@ import org.apache.accumulo.core.data.thrift.TKeyExtent; import org.apache.accumulo.core.data.thrift.TKeyValue; import org.apache.accumulo.core.data.thrift.TMutation; import org.apache.accumulo.core.data.thrift.TRange; +import org.apache.accumulo.core.data.thrift.TRowRange; +import org.apache.accumulo.core.data.thrift.TSummaries; +import org.apache.accumulo.core.data.thrift.TSummaryRequest; import org.apache.accumulo.core.data.thrift.UpdateErrors; +import org.apache.accumulo.core.file.blockfile.cache.BlockCache; import org.apache.accumulo.core.iterators.IterationInterruptedException; import org.apache.accumulo.core.master.thrift.BulkImportState; import org.apache.accumulo.core.master.thrift.Compacting; @@ -119,6 +127,9 @@ import org.apache.accumulo.core.rpc.ThriftUtil; import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.security.thrift.TCredentials; +import org.apache.accumulo.core.summary.Gatherer; +import org.apache.accumulo.core.summary.Gatherer.FileSystemResolver; +import org.apache.accumulo.core.summary.SummaryCollection; import org.apache.accumulo.core.tabletserver.log.LogEntry; import org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction; import org.apache.accumulo.core.tabletserver.thrift.ActiveScan; @@ -235,6 +246,7 @@ import org.apache.accumulo.tserver.session.MultiScanSession; import org.apache.accumulo.tserver.session.ScanSession; import org.apache.accumulo.tserver.session.Session; import org.apache.accumulo.tserver.session.SessionManager; +import org.apache.accumulo.tserver.session.SummarySession; import org.apache.accumulo.tserver.session.UpdateSession; import org.apache.accumulo.tserver.tablet.BulkImportCacheCleaner; import org.apache.accumulo.tserver.tablet.CommitSession; @@ -1790,6 +1802,109 @@ public class TabletServer extends AccumuloServerContext implements Runnable { log.warn("Garbage collector is attempting to remove logs through the tablet server"); log.warn("This is probably because your file Garbage Collector is an older version than your tablet servers.\n" + "Restart your file Garbage Collector."); } + + private TSummaries getSummaries(Future future) throws TimeoutException { + try { + SummaryCollection sc = future.get(MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS, TimeUnit.MILLISECONDS); + return sc.toThrift(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } + } + + private TSummaries handleTimeout(long sessionId) { + long timeout = TabletServer.this.getConfiguration().getTimeInMillis(Property.TSERV_CLIENT_TIMEOUT); + sessionManager.removeIfNotAccessed(sessionId, timeout); + return new TSummaries(false, sessionId, -1, -1, null); + } + + private TSummaries startSummaryOperation(TCredentials credentials, Future future) { + try { + return getSummaries(future); + } catch (TimeoutException e) { + long sid = sessionManager.createSession(new SummarySession(credentials, future), false); + while (sid == 0) { + sessionManager.removeSession(sid); + sid = sessionManager.createSession(new SummarySession(credentials, future), false); + } + return handleTimeout(sid); + } + } + + @Override + public TSummaries startGetSummaries(TInfo tinfo, TCredentials credentials, TSummaryRequest request) throws ThriftSecurityException, + ThriftTableOperationException, NoSuchScanIDException, TException { + String namespaceId; + try { + namespaceId = Tables.getNamespaceId(TabletServer.this.getInstance(), request.getTableId()); + } catch (TableNotFoundException e1) { + throw new ThriftTableOperationException(request.getTableId(), null, null, TableOperationExceptionType.NOTFOUND, null); + } + + if (!security.canGetSummaries(credentials, request.getTableId(), namespaceId)) { + throw new AccumuloSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED).asThriftException(); + } + + ServerConfigurationFactory factory = TabletServer.this.getServerConfigurationFactory(); + ExecutorService es = resourceManager.getSummaryPartitionExecutor(); + Future future = new Gatherer(TabletServer.this, request, factory.getTableConfiguration(request.getTableId())).gather(es); + + return startSummaryOperation(credentials, future); + } + + @Override + public TSummaries startGetSummariesForPartition(TInfo tinfo, TCredentials credentials, TSummaryRequest request, int modulus, int remainder) + throws ThriftSecurityException, NoSuchScanIDException, TException { + // do not expect users to call this directly, expect other tservers to call this method + if (!security.canPerformSystemActions(credentials)) { + throw new AccumuloSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED).asThriftException(); + } + + ServerConfigurationFactory factory = TabletServer.this.getServerConfigurationFactory(); + ExecutorService spe = resourceManager.getSummaryRemoteExecutor(); + Future future = new Gatherer(TabletServer.this, request, factory.getTableConfiguration(request.getTableId())).processPartition(spe, + modulus, remainder); + + return startSummaryOperation(credentials, future); + } + + @Override + public TSummaries startGetSummariesFromFiles(TInfo tinfo, TCredentials credentials, TSummaryRequest request, Map> files) + throws ThriftSecurityException, NoSuchScanIDException, TException { + // do not expect users to call this directly, expect other tservers to call this method + if (!security.canPerformSystemActions(credentials)) { + throw new AccumuloSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED).asThriftException(); + } + + ExecutorService srp = resourceManager.getSummaryRetrievalExecutor(); + TableConfiguration tableCfg = confFactory.getTableConfiguration(request.getTableId()); + BlockCache summaryCache = resourceManager.getSummaryCache(); + BlockCache indexCache = resourceManager.getIndexCache(); + FileSystemResolver volMgr = p -> fs.getVolumeByPath(p).getFileSystem(); + Future future = new Gatherer(TabletServer.this, request, tableCfg).processFiles(volMgr, files, summaryCache, indexCache, srp); + + return startSummaryOperation(credentials, future); + } + + @Override + public TSummaries contiuneGetSummaries(TInfo tinfo, long sessionId) throws NoSuchScanIDException, TException { + SummarySession session = (SummarySession) sessionManager.getSession(sessionId); + if (session == null) { + throw new NoSuchScanIDException(); + } + + Future future = session.getFuture(); + try { + TSummaries tsums = getSummaries(future); + sessionManager.removeSession(sessionId); + return tsums; + } catch (TimeoutException e) { + return handleTimeout(sessionId); + } + } } private class SplitRunner implements Runnable { http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java index 3cd7bfa..411ffa5 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java @@ -16,6 +16,7 @@ */ package org.apache.accumulo.tserver; +import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; import static java.util.Objects.requireNonNull; import java.io.IOException; @@ -65,7 +66,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; -import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; /** * ResourceManager is responsible for managing the resources of all tablets within a tablet server. @@ -86,12 +86,13 @@ public class TabletServerResourceManager { private final ExecutorService assignMetaDataPool; private final ExecutorService readAheadThreadPool; private final ExecutorService defaultReadAheadThreadPool; + private final ExecutorService summaryRetrievalPool; + private final ExecutorService summaryParitionPool; + private final ExecutorService summaryRemotePool; private final Map threadPools = new TreeMap<>(); private final ConcurrentHashMap activeAssignments; - private final VolumeManager fs; - private final FileManager fileManager; private final MemoryManager memoryManager; @@ -100,6 +101,7 @@ public class TabletServerResourceManager { private final BlockCache _dCache; private final BlockCache _iCache; + private final BlockCache _sCache; private final TabletServer tserver; private final ServerConfigurationFactory conf; @@ -141,6 +143,14 @@ public class TabletServerResourceManager { return createEs(max, name, new LinkedBlockingQueue()); } + private ExecutorService createIdlingEs(Property max, String name, long timeout, TimeUnit timeUnit) { + LinkedBlockingQueue queue = new LinkedBlockingQueue(); + int maxThreads = conf.getConfiguration().getCount(max); + ThreadPoolExecutor tp = new ThreadPoolExecutor(maxThreads, maxThreads, timeout, timeUnit, queue, new NamingThreadFactory(name)); + tp.allowCoreThreadTimeOut(true); + return addEs(max, name, tp); + } + private ExecutorService createEs(Property max, String name, BlockingQueue queue) { int maxThreads = conf.getConfiguration().getCount(max); ThreadPoolExecutor tp = new ThreadPoolExecutor(maxThreads, maxThreads, 0L, TimeUnit.MILLISECONDS, queue, new NamingThreadFactory(name)); @@ -154,7 +164,6 @@ public class TabletServerResourceManager { public TabletServerResourceManager(TabletServer tserver, VolumeManager fs) { this.tserver = tserver; this.conf = tserver.getServerConfigurationFactory(); - this.fs = fs; final AccumuloConfiguration acuConf = conf.getConfiguration(); long maxMemory = acuConf.getMemoryInBytes(Property.TSERV_MAXMEM); @@ -163,15 +172,18 @@ public class TabletServerResourceManager { long blockSize = acuConf.getMemoryInBytes(Property.TSERV_DEFAULT_BLOCKSIZE); long dCacheSize = acuConf.getMemoryInBytes(Property.TSERV_DATACACHE_SIZE); long iCacheSize = acuConf.getMemoryInBytes(Property.TSERV_INDEXCACHE_SIZE); + long sCacheSize = acuConf.getMemoryInBytes(Property.TSERV_SUMMARYCACHE_SIZE); long totalQueueSize = acuConf.getMemoryInBytes(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX); String policy = acuConf.get(Property.TSERV_CACHE_POLICY); if (policy.equalsIgnoreCase("LRU")) { _iCache = new LruBlockCache(iCacheSize, blockSize); _dCache = new LruBlockCache(dCacheSize, blockSize); + _sCache = new LruBlockCache(sCacheSize, blockSize); } else if (policy.equalsIgnoreCase("TinyLFU")) { _iCache = new TinyLfuBlockCache(iCacheSize, blockSize); _dCache = new TinyLfuBlockCache(dCacheSize, blockSize); + _sCache = new TinyLfuBlockCache(sCacheSize, blockSize); } else { throw new IllegalArgumentException("Unknown Block cache policy " + policy); } @@ -179,14 +191,14 @@ public class TabletServerResourceManager { Runtime runtime = Runtime.getRuntime(); if (usingNativeMap) { // Still check block cache sizes when using native maps. - if (dCacheSize + iCacheSize + totalQueueSize > runtime.maxMemory()) { + if (dCacheSize + iCacheSize + sCacheSize + totalQueueSize > runtime.maxMemory()) { throw new IllegalArgumentException(String.format("Block cache sizes %,d and mutation queue size %,d is too large for this JVM configuration %,d", - dCacheSize + iCacheSize, totalQueueSize, runtime.maxMemory())); + dCacheSize + iCacheSize + sCacheSize, totalQueueSize, runtime.maxMemory())); } - } else if (maxMemory + dCacheSize + iCacheSize + totalQueueSize > runtime.maxMemory()) { + } else if (maxMemory + dCacheSize + iCacheSize + sCacheSize + totalQueueSize > runtime.maxMemory()) { throw new IllegalArgumentException(String.format( "Maximum tablet server map memory %,d block cache sizes %,d and mutation queue size %,d is too large for this JVM configuration %,d", maxMemory, - dCacheSize + iCacheSize, totalQueueSize, runtime.maxMemory())); + dCacheSize + iCacheSize + sCacheSize, totalQueueSize, runtime.maxMemory())); } runtime.gc(); @@ -222,6 +234,10 @@ public class TabletServerResourceManager { readAheadThreadPool = createEs(Property.TSERV_READ_AHEAD_MAXCONCURRENT, "tablet read ahead"); defaultReadAheadThreadPool = createEs(Property.TSERV_METADATA_READ_AHEAD_MAXCONCURRENT, "metadata tablets read ahead"); + summaryRetrievalPool = createIdlingEs(Property.TSERV_SUMMARY_RETRIEVAL_THREADS, "summary file retriever", 60, TimeUnit.SECONDS); + summaryRemotePool = createIdlingEs(Property.TSERV_SUMMARY_REMOTE_THREADS, "summary remote", 60, TimeUnit.SECONDS); + summaryParitionPool = createIdlingEs(Property.TSERV_SUMMARY_PARTITION_THREADS, "summary partition", 60, TimeUnit.SECONDS); + int maxOpenFiles = acuConf.getCount(Property.TSERV_SCAN_MAX_OPENFILES); fileManager = new FileManager(tserver, fs, maxOpenFiles, _dCache, _iCache); @@ -657,7 +673,7 @@ public class TabletServerResourceManager { CompactionStrategy strategy = Property.createTableInstanceFromPropertyName(tableConf, Property.TABLE_COMPACTION_STRATEGY, CompactionStrategy.class, new DefaultCompactionStrategy()); strategy.init(Property.getCompactionStrategyOptions(tableConf)); - MajorCompactionRequest request = new MajorCompactionRequest(extent, reason, TabletServerResourceManager.this.fs, tableConf); + MajorCompactionRequest request = new MajorCompactionRequest(extent, reason, tableConf); request.setFiles(tabletFiles); try { return strategy.shouldCompact(request); @@ -760,4 +776,19 @@ public class TabletServerResourceManager { return _dCache; } + public BlockCache getSummaryCache() { + return _sCache; + } + + public ExecutorService getSummaryRetrievalExecutor() { + return summaryRetrievalPool; + } + + public ExecutorService getSummaryPartitionExecutor() { + return summaryParitionPool; + } + + public ExecutorService getSummaryRemoteExecutor() { + return summaryRemotePool; + } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/CompactionStrategy.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/CompactionStrategy.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/CompactionStrategy.java index 0e0089a..85a4a13 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/CompactionStrategy.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/CompactionStrategy.java @@ -32,7 +32,6 @@ import java.util.Map; * compaction thread. */ public abstract class CompactionStrategy { - /** * The settings for the compaction strategy pulled from zookeeper. The table.compacations.major.strategy.opts part of the setting will be removed. */