Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id C54B8200C3E for ; Tue, 7 Mar 2017 01:50:13 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id C40DB160B89; Tue, 7 Mar 2017 00:50:13 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id F193E160B87 for ; Tue, 7 Mar 2017 01:50:12 +0100 (CET) Received: (qmail 75716 invoked by uid 500); 7 Mar 2017 00:50:11 -0000 Mailing-List: contact dev-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list dev@accumulo.apache.org Received: (qmail 75146 invoked by uid 99); 7 Mar 2017 00:50:11 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 07 Mar 2017 00:50:11 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 55355DFA6F; Tue, 7 Mar 2017 00:50:11 +0000 (UTC) From: joshelser To: dev@accumulo.apache.org Reply-To: dev@accumulo.apache.org References: In-Reply-To: Subject: [GitHub] accumulo pull request #224: ACCUMULO-4500 ACCUMULO-96 Added summarization Content-Type: text/plain Message-Id: <20170307005011.55355DFA6F@git1-us-west.apache.org> Date: Tue, 7 Mar 2017 00:50:11 +0000 (UTC) archived-at: Tue, 07 Mar 2017 00:50:13 -0000 Github user joshelser commented on a diff in the pull request: https://github.com/apache/accumulo/pull/224#discussion_r104516639 --- Diff: core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java --- @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.accumulo.core.summary; + +import java.io.DataInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.PrintStream; +import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.function.Predicate; + +import org.apache.accumulo.core.client.summary.SummarizerConfiguration; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.file.FileSKVIterator; +import org.apache.accumulo.core.file.blockfile.cache.BlockCache; +import org.apache.accumulo.core.file.blockfile.cache.CacheEntry; +import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile; +import org.apache.accumulo.core.file.rfile.RFile.Reader; +import org.apache.accumulo.core.file.rfile.bcfile.MetaBlockDoesNotExist; +import org.apache.accumulo.core.summary.Gatherer.RowRange; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.Seekable; +import org.apache.hadoop.io.WritableUtils; + +public class SummaryReader { + + private static interface BlockReader { + DataInputStream getMetaBlock(String name) throws IOException; + } + + private static class CompositeCache implements BlockCache { + + private BlockCache summaryCache; + private BlockCache indexCache; + + CompositeCache(BlockCache summaryCache, BlockCache indexCache) { + this.summaryCache = summaryCache; + this.indexCache = indexCache; + } + + @Override + public CacheEntry cacheBlock(String blockName, byte[] buf) { + return summaryCache.cacheBlock(blockName, buf); + } + + @Override + public CacheEntry cacheBlock(String blockName, byte[] buf, boolean inMemory) { + return summaryCache.cacheBlock(blockName, buf, inMemory); + } + + @Override + public CacheEntry getBlock(String blockName) { + CacheEntry ce = summaryCache.getBlock(blockName); + if (ce == null) { + // Its possible the index cache may have this info, so check there. This is an opportunistic check. + ce = indexCache.getBlock(blockName); + } + return ce; + } + + @Override + public long getMaxSize() { + return summaryCache.getMaxSize(); + } + + @Override + public Stats getStats() { + return summaryCache.getStats(); + } + } + + private static List load(BlockReader bcReader, Predicate summarySelector) throws IOException { + + try (DataInputStream in = bcReader.getMetaBlock(SummaryWriter.METASTORE_INDEX)) { + List stores = new ArrayList<>(); + + readHeader(in); + int numSummaries = WritableUtils.readVInt(in); + for (int i = 0; i < numSummaries; i++) { + SummarizerConfiguration conf = readConfig(in); + boolean inline = in.readBoolean(); + if (inline) { + if (summarySelector.test(conf)) { + stores.add(SummarySerializer.load(conf, in)); + } else { + SummarySerializer.skip(in); + } + } else { + int block = WritableUtils.readVInt(in); + int offset = WritableUtils.readVInt(in); + if (summarySelector.test(conf)) { + try (DataInputStream summaryIn = bcReader.getMetaBlock(SummaryWriter.METASTORE_PREFIX + "." + block)) { + long skipped = in.skip(offset); + while (skipped < offset) { + skipped += in.skip(offset - skipped); + } + stores.add(SummarySerializer.load(conf, summaryIn)); + } catch (MetaBlockDoesNotExist e) { + // this is unexpected + throw new IOException(e); + } + } + } + } + + return stores; + } catch (MetaBlockDoesNotExist e) { + return Collections.emptyList(); + } + } + + private static SummaryReader load(CachableBlockFile.Reader bcReader, Predicate summarySelector, SummarizerFactory factory) + throws IOException { + SummaryReader fileSummaries = new SummaryReader(); + fileSummaries.summaryStores = load(name -> bcReader.getMetaBlock(name), summarySelector); + fileSummaries.factory = factory; + return fileSummaries; + } + + public static SummaryReader load(Configuration conf, AccumuloConfiguration aConf, InputStream inputStream, long length, + Predicate summarySelector, SummarizerFactory factory) throws IOException { + org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile.Reader bcReader = new CachableBlockFile.Reader((InputStream & Seekable) inputStream, length, + conf, aConf); + return load(bcReader, summarySelector, factory); + } + + public static SummaryReader load(FileSystem fs, Configuration conf, AccumuloConfiguration aConf, SummarizerFactory factory, Path file, + Predicate summarySelector, BlockCache summaryCache, BlockCache indexCache) throws IllegalArgumentException, IOException { + CachableBlockFile.Reader bcReader = null; + + try { + // the reason BCFile is used instead of RFile is to avoid reading in the RFile meta block when only summary data is wanted. + CompositeCache compositeCache = new CompositeCache(summaryCache, indexCache); + bcReader = new CachableBlockFile.Reader(fs, file, conf, null, compositeCache, aConf); + return load(bcReader, summarySelector, factory); + } catch (IOException e) { + throw new UncheckedIOException(e); + } finally { + if (bcReader != null) { + try { + bcReader.close(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + } + + } + + private static void print(FileSKVIterator fsi, String indent, PrintStream out) throws IOException { + + out.printf("Summary data : \n"); + + List stores = load(name -> fsi.getMetaStore(name), conf -> true); + int i = 1; + for (SummarySerializer summaryStore : stores) { + out.printf("%sSummary %d of %d generated by : %s\n", indent, i, stores.size(), summaryStore.getSummarizerConfiguration()); + i++; + summaryStore.print(indent, indent, out); + } + } + + public static void print(Reader iter) throws IOException { --- End diff -- If the `PrintStream` is configurable, there should be a `public` entry-point for it too, no? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastructure@apache.org or file a JIRA ticket with INFRA. ---