accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject [5/9] accumulo git commit: ACCUMULO-4501 ACCUMULO-96 Added Summarization
Date Mon, 20 Mar 2017 14:49:01 GMT
http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/summary/SummaryCollection.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/summary/SummaryCollection.java b/core/src/main/java/org/apache/accumulo/core/summary/SummaryCollection.java
new file mode 100644
index 0000000..cc688c9
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/summary/SummaryCollection.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.summary;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.summary.Summarizer;
+import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
+import org.apache.accumulo.core.client.summary.Summary;
+import org.apache.accumulo.core.data.thrift.TSummaries;
+import org.apache.accumulo.core.data.thrift.TSummarizerConfiguration;
+import org.apache.accumulo.core.data.thrift.TSummary;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * This class facilitates merging, storing, and serializing (to/from thrift) intermediate
summary information.
+ */
+public class SummaryCollection {
+
+  private static class MergedSummary {
+    Map<String,Long> summary;
+    long filesContaining;
+    long filesExceedingBoundry;
+    long filesLarge;
+
+    public MergedSummary(FileSummary entry) {
+      this.summary = entry.summary;
+      this.filesContaining = 1;
+      this.filesExceedingBoundry = entry.exceededBoundry ? 1 : 0;
+      this.filesLarge = entry.exceededMaxSize ? 1 : 0;
+    }
+
+    public MergedSummary(TSummary tSummary) {
+      this.summary = new HashMap<>(tSummary.getSummary());
+      this.filesContaining = tSummary.getFilesContaining();
+      this.filesExceedingBoundry = tSummary.getFilesExceeding();
+      this.filesLarge = tSummary.getFilesLarge();
+    }
+
+    public void merge(MergedSummary other, SummarizerConfiguration config, SummarizerFactory
factory) {
+
+      if (summary == null && other.summary != null) {
+        summary = new HashMap<>(other.summary);
+      } else if (summary != null && other.summary != null) {
+        Summarizer summarizer = factory.getSummarizer(config);
+        summarizer.combiner(config).merge(summary, other.summary);
+      }
+
+      filesContaining += other.filesContaining;
+      filesExceedingBoundry += other.filesExceedingBoundry;
+      filesLarge += other.filesLarge;
+    }
+
+    public TSummary toThrift(SummarizerConfiguration key) {
+      TSummarizerConfiguration tsumConf = SummarizerConfigurationUtil.toThrift(key);
+      return new TSummary(summary, tsumConf, filesContaining, filesExceedingBoundry, filesLarge);
+    }
+
+  }
+
+  private Map<SummarizerConfiguration,MergedSummary> mergedSummaries;
+  private long totalFiles;
+  private long deletedFiles;
+
+  public SummaryCollection() {
+    mergedSummaries = new HashMap<>();
+    totalFiles = 0;
+  }
+
+  public SummaryCollection(TSummaries tsums) {
+    mergedSummaries = new HashMap<>();
+    for (TSummary tSummary : tsums.getSummaries()) {
+      SummarizerConfiguration sconf = SummarizerConfigurationUtil.fromThrift(tSummary.getConfig());
+      mergedSummaries.put(sconf, new MergedSummary(tSummary));
+    }
+
+    totalFiles = tsums.getTotalFiles();
+    deletedFiles = tsums.getDeletedFiles();
+  }
+
+  SummaryCollection(Collection<FileSummary> initialEntries) {
+    this(initialEntries, false);
+  }
+
+  SummaryCollection(Collection<FileSummary> initialEntries, boolean deleted) {
+    if (deleted) {
+      Preconditions.checkArgument(initialEntries.size() == 0);
+    }
+    mergedSummaries = new HashMap<>();
+    for (FileSummary entry : initialEntries) {
+      mergedSummaries.put(entry.conf, new MergedSummary(entry));
+    }
+    totalFiles = 1;
+    this.deletedFiles = deleted ? 1 : 0;
+  }
+
+  static class FileSummary {
+
+    private SummarizerConfiguration conf;
+    private Map<String,Long> summary;
+    private boolean exceededBoundry;
+    private boolean exceededMaxSize;
+
+    FileSummary(SummarizerConfiguration conf, Map<String,Long> summary, boolean exceededBoundry)
{
+      this.conf = conf;
+      this.summary = summary;
+      this.exceededBoundry = exceededBoundry;
+      this.exceededMaxSize = false;
+    }
+
+    FileSummary(SummarizerConfiguration conf) {
+      this.conf = conf;
+      this.summary = new HashMap<>();
+      ;
+      this.exceededBoundry = false;
+      this.exceededMaxSize = true;
+    }
+  }
+
+  public void merge(SummaryCollection other, SummarizerFactory factory) {
+    for (Entry<SummarizerConfiguration,MergedSummary> entry : other.mergedSummaries.entrySet())
{
+      MergedSummary ms = mergedSummaries.get(entry.getKey());
+      if (ms == null) {
+        mergedSummaries.put(entry.getKey(), entry.getValue());
+      } else {
+        ms.merge(entry.getValue(), entry.getKey(), factory);
+      }
+    }
+
+    this.totalFiles += other.totalFiles;
+    this.deletedFiles += other.deletedFiles;
+  }
+
+  public static SummaryCollection merge(SummaryCollection sc1, SummaryCollection sc2, SummarizerFactory
factory) {
+    SummaryCollection ret = new SummaryCollection();
+    ret.merge(sc1, factory);
+    ret.merge(sc2, factory);
+    return ret;
+  }
+
+  public List<Summary> getSummaries() {
+    ArrayList<Summary> ret = new ArrayList<>(mergedSummaries.size());
+
+    for (Entry<SummarizerConfiguration,MergedSummary> entry : mergedSummaries.entrySet())
{
+      SummarizerConfiguration config = entry.getKey();
+      MergedSummary ms = entry.getValue();
+
+      ret.add(new Summary(ms.summary, config, totalFiles, (totalFiles - deletedFiles) - ms.filesContaining,
ms.filesExceedingBoundry, ms.filesLarge,
+          deletedFiles));
+    }
+
+    return ret;
+  }
+
+  public long getTotalFiles() {
+    return totalFiles;
+  }
+
+  public TSummaries toThrift() {
+    List<TSummary> summaries = new ArrayList<>(mergedSummaries.size());
+    for (Entry<SummarizerConfiguration,MergedSummary> entry : mergedSummaries.entrySet())
{
+      summaries.add(entry.getValue().toThrift(entry.getKey()));
+    }
+
+    return new TSummaries(true, -1l, totalFiles, deletedFiles, summaries);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/summary/SummaryInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/summary/SummaryInfo.java b/core/src/main/java/org/apache/accumulo/core/summary/SummaryInfo.java
new file mode 100644
index 0000000..7b9ebe4
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/summary/SummaryInfo.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.summary;
+
+import java.util.Map;
+
+import org.apache.hadoop.io.Text;
+
+class SummaryInfo {
+
+  final Map<String,Long> summary;
+  final Text lastRow;
+  final int count;
+
+  SummaryInfo(Text row, Map<String,Long> summary, int count) {
+    this.lastRow = row;
+    this.summary = summary;
+    this.count = count;
+  }
+
+  SummaryInfo(byte[] row, Map<String,Long> summary, int count) {
+    this.lastRow = new Text(row);
+    this.summary = summary;
+    this.count = count;
+  }
+
+  Text getLastRow() {
+    return lastRow;
+  }
+
+  Map<String,Long> getSummary() {
+    return summary;
+  }
+
+  int getCount() {
+    return count;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java b/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java
new file mode 100644
index 0000000..9b2b5d9
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.summary;
+
+import java.io.DataInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PrintStream;
+import java.io.UncheckedIOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Predicate;
+
+import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.file.blockfile.cache.BlockCache;
+import org.apache.accumulo.core.file.blockfile.cache.CacheEntry;
+import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile;
+import org.apache.accumulo.core.file.rfile.RFile.Reader;
+import org.apache.accumulo.core.file.rfile.bcfile.MetaBlockDoesNotExist;
+import org.apache.accumulo.core.summary.Gatherer.RowRange;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.io.WritableUtils;
+
+public class SummaryReader {
+
+  private static interface BlockReader {
+    DataInputStream getMetaBlock(String name) throws IOException;
+  }
+
+  private static class CompositeCache implements BlockCache {
+
+    private BlockCache summaryCache;
+    private BlockCache indexCache;
+
+    CompositeCache(BlockCache summaryCache, BlockCache indexCache) {
+      this.summaryCache = summaryCache;
+      this.indexCache = indexCache;
+    }
+
+    @Override
+    public CacheEntry cacheBlock(String blockName, byte[] buf) {
+      return summaryCache.cacheBlock(blockName, buf);
+    }
+
+    @Override
+    public CacheEntry cacheBlock(String blockName, byte[] buf, boolean inMemory) {
+      return summaryCache.cacheBlock(blockName, buf, inMemory);
+    }
+
+    @Override
+    public CacheEntry getBlock(String blockName) {
+      CacheEntry ce = summaryCache.getBlock(blockName);
+      if (ce == null) {
+        // Its possible the index cache may have this info, so check there. This is an opportunistic
check.
+        ce = indexCache.getBlock(blockName);
+      }
+      return ce;
+    }
+
+    @Override
+    public long getMaxSize() {
+      return summaryCache.getMaxSize();
+    }
+
+    @Override
+    public Stats getStats() {
+      return summaryCache.getStats();
+    }
+  }
+
+  private static List<SummarySerializer> load(BlockReader bcReader, Predicate<SummarizerConfiguration>
summarySelector) throws IOException {
+
+    try (DataInputStream in = bcReader.getMetaBlock(SummaryWriter.METASTORE_INDEX)) {
+      List<SummarySerializer> stores = new ArrayList<>();
+
+      readHeader(in);
+      int numSummaries = WritableUtils.readVInt(in);
+      for (int i = 0; i < numSummaries; i++) {
+        SummarizerConfiguration conf = readConfig(in);
+        boolean inline = in.readBoolean();
+        if (inline) {
+          if (summarySelector.test(conf)) {
+            stores.add(SummarySerializer.load(conf, in));
+          } else {
+            SummarySerializer.skip(in);
+          }
+        } else {
+          int block = WritableUtils.readVInt(in);
+          int offset = WritableUtils.readVInt(in);
+          if (summarySelector.test(conf)) {
+            try (DataInputStream summaryIn = bcReader.getMetaBlock(SummaryWriter.METASTORE_PREFIX
+ "." + block)) {
+              long skipped = in.skip(offset);
+              while (skipped < offset) {
+                skipped += in.skip(offset - skipped);
+              }
+              stores.add(SummarySerializer.load(conf, summaryIn));
+            } catch (MetaBlockDoesNotExist e) {
+              // this is unexpected
+              throw new IOException(e);
+            }
+          }
+        }
+      }
+
+      return stores;
+    } catch (MetaBlockDoesNotExist e) {
+      return Collections.emptyList();
+    }
+  }
+
+  private static SummaryReader load(CachableBlockFile.Reader bcReader, Predicate<SummarizerConfiguration>
summarySelector, SummarizerFactory factory)
+      throws IOException {
+    SummaryReader fileSummaries = new SummaryReader();
+    fileSummaries.summaryStores = load(name -> bcReader.getMetaBlock(name), summarySelector);
+    fileSummaries.factory = factory;
+    return fileSummaries;
+  }
+
+  public static SummaryReader load(Configuration conf, AccumuloConfiguration aConf, InputStream
inputStream, long length,
+      Predicate<SummarizerConfiguration> summarySelector, SummarizerFactory factory)
throws IOException {
+    org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile.Reader bcReader = new
CachableBlockFile.Reader((InputStream & Seekable) inputStream, length,
+        conf, aConf);
+    return load(bcReader, summarySelector, factory);
+  }
+
+  public static SummaryReader load(FileSystem fs, Configuration conf, AccumuloConfiguration
aConf, SummarizerFactory factory, Path file,
+      Predicate<SummarizerConfiguration> summarySelector, BlockCache summaryCache,
BlockCache indexCache) {
+    CachableBlockFile.Reader bcReader = null;
+
+    try {
+      // the reason BCFile is used instead of RFile is to avoid reading in the RFile meta
block when only summary data is wanted.
+      CompositeCache compositeCache = new CompositeCache(summaryCache, indexCache);
+      bcReader = new CachableBlockFile.Reader(fs, file, conf, null, compositeCache, aConf);
+      return load(bcReader, summarySelector, factory);
+    } catch (FileNotFoundException fne) {
+      SummaryReader sr = new SummaryReader();
+      sr.factory = factory;
+      sr.summaryStores = Collections.emptyList();
+      sr.deleted = true;
+      return sr;
+    } catch (IOException e) {
+      try {
+        if (!fs.exists(file)) {
+          SummaryReader sr = new SummaryReader();
+          sr.factory = factory;
+          sr.summaryStores = Collections.emptyList();
+          sr.deleted = true;
+          return sr;
+        }
+      } catch (IOException e1) {}
+      throw new UncheckedIOException(e);
+    } finally {
+      if (bcReader != null) {
+        try {
+          bcReader.close();
+        } catch (IOException e) {
+          throw new UncheckedIOException(e);
+        }
+      }
+    }
+
+  }
+
+  private static void print(FileSKVIterator fsi, String indent, PrintStream out) throws IOException
{
+
+    out.printf("Summary data : \n");
+
+    List<SummarySerializer> stores = load(name -> fsi.getMetaStore(name), conf ->
true);
+    int i = 1;
+    for (SummarySerializer summaryStore : stores) {
+      out.printf("%sSummary %d of %d generated by : %s\n", indent, i, stores.size(), summaryStore.getSummarizerConfiguration());
+      i++;
+      summaryStore.print(indent, indent, out);
+    }
+  }
+
+  public static void print(Reader iter, PrintStream out) throws IOException {
+    print(iter, "   ", out);
+  }
+
+  private static SummarizerConfiguration readConfig(DataInputStream in) throws IOException
{
+    // read summarizer configuration
+    String summarizerClazz = in.readUTF();
+    String configId = in.readUTF();
+    org.apache.accumulo.core.client.summary.SummarizerConfiguration.Builder scb = SummarizerConfiguration.builder(summarizerClazz).setPropertyId(configId);
+    int numOpts = WritableUtils.readVInt(in);
+    for (int i = 0; i < numOpts; i++) {
+      String k = in.readUTF();
+      String v = in.readUTF();
+      scb.addOption(k, v);
+    }
+
+    return scb.build();
+  }
+
+  private static byte readHeader(DataInputStream in) throws IOException {
+    long magic = in.readLong();
+    if (magic != SummaryWriter.MAGIC) {
+      throw new IOException("Bad magic : " + String.format("%x", magic));
+    }
+
+    byte ver = in.readByte();
+    if (ver != SummaryWriter.VER) {
+      throw new IOException("Unknown version : " + ver);
+    }
+
+    return ver;
+  }
+
+  private List<SummarySerializer> summaryStores;
+
+  private SummarizerFactory factory;
+
+  private boolean deleted;
+
+  public SummaryCollection getSummaries(List<RowRange> ranges) {
+
+    List<SummaryCollection.FileSummary> initial = new ArrayList<>();
+    if (deleted) {
+      return new SummaryCollection(initial, true);
+    }
+    for (SummarySerializer summaryStore : summaryStores) {
+      if (summaryStore.exceededMaxSize()) {
+        initial.add(new SummaryCollection.FileSummary(summaryStore.getSummarizerConfiguration()));
+      } else {
+        Map<String,Long> summary = summaryStore.getSummary(ranges, factory);
+        boolean exceeded = summaryStore.exceedsRange(ranges);
+        initial.add(new SummaryCollection.FileSummary(summaryStore.getSummarizerConfiguration(),
summary, exceeded));
+      }
+    }
+
+    return new SummaryCollection(initial);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/summary/SummarySerializer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/summary/SummarySerializer.java b/core/src/main/java/org/apache/accumulo/core/summary/SummarySerializer.java
new file mode 100644
index 0000000..d76bd1a
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/summary/SummarySerializer.java
@@ -0,0 +1,542 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.summary;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.summary.Summarizer;
+import org.apache.accumulo.core.client.summary.Summarizer.Collector;
+import org.apache.accumulo.core.client.summary.Summarizer.Combiner;
+import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.summary.Gatherer.RowRange;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+
+/**
+ * This class supports serializing summaries and periodically storing summaries. The implementations
attempts to generate around 10 summaries that are evenly
+ * spaced. This allows asking for summaries for sub-ranges of data in a rfile.
+ *
+ * <p>
+ * At first summaries are created for every 1000 keys values. After 10 summaries are added,
the 10 summaries are merged to 5 and summaries are then created for
+ * every 2000 key values. The code keeps merging summaries and doubling the amount of key
values per summary. This results in each summary covering about the
+ * same number of key values.
+ *
+ */
+
+class SummarySerializer {
+
+  private SummarizerConfiguration sconf;
+  private LgSummaries[] allSummaries;
+
+  private SummarySerializer(SummarizerConfiguration sconf, LgSummaries[] allSummaries) {
+    this.sconf = sconf;
+    this.allSummaries = allSummaries;
+  }
+
+  private SummarySerializer(SummarizerConfiguration sconf) {
+    this.sconf = sconf;
+    // this indicates max size was exceeded
+    this.allSummaries = null;
+  }
+
+  public SummarizerConfiguration getSummarizerConfiguration() {
+    return sconf;
+  }
+
+  public void print(String prefix, String indent, PrintStream out) {
+
+    if (allSummaries == null) {
+      out.printf("%sSummary not stored because it was too large\n", prefix + indent);
+    } else {
+      for (LgSummaries lgs : allSummaries) {
+        lgs.print(prefix, indent, out);
+      }
+    }
+  }
+
+  public Map<String,Long> getSummary(List<RowRange> ranges, SummarizerFactory
sf) {
+
+    Summarizer kvs = sf.getSummarizer(sconf);
+
+    Map<String,Long> summary = new HashMap<>();
+    for (LgSummaries lgs : allSummaries) {
+      lgs.getSummary(ranges, kvs.combiner(sconf), summary);
+    }
+    return summary;
+  }
+
+  public boolean exceedsRange(List<RowRange> ranges) {
+    boolean er = false;
+    for (LgSummaries lgs : allSummaries) {
+      for (RowRange ke : ranges) {
+        er |= lgs.exceedsRange(ke.getStartRow(), ke.getEndRow());
+        if (er) {
+          return er;
+        }
+      }
+    }
+
+    return er;
+  }
+
+  public boolean exceededMaxSize() {
+    return allSummaries == null;
+  }
+
+  private static class SummaryStoreImpl implements org.apache.accumulo.core.client.summary.Summarizer.StatisticConsumer
{
+
+    HashMap<String,Long> summaries;
+
+    @Override
+    public void accept(String summary, long value) {
+      summaries.put(summary, value);
+    }
+  }
+
+  private static class LgBuilder {
+    private Summarizer summarizer;
+    private SummarizerConfiguration conf;
+    private Collector collector;
+
+    private int maxSummaries = 10;
+
+    private int cutoff = 1000;
+    private int count = 0;
+
+    private List<SummaryInfo> summaries = new ArrayList<>();
+
+    private Key lastKey;
+
+    private SummaryStoreImpl sci = new SummaryStoreImpl();
+
+    private String name;
+
+    private boolean sawFirst = false;
+    private Text firstRow;
+
+    private boolean finished = false;
+
+    public LgBuilder(SummarizerConfiguration conf, Summarizer kvs) {
+      this.conf = conf;
+      this.summarizer = kvs;
+      this.name = "<DEFAULT>";
+      this.collector = kvs.collector(conf);
+    }
+
+    public LgBuilder(SummarizerConfiguration conf, Summarizer kvs, String name) {
+      this.conf = conf;
+      this.summarizer = kvs;
+      this.name = name;
+      this.collector = kvs.collector(conf);
+    }
+
+    public void put(Key k, Value v) {
+      collector.accept(k, v);
+      count++;
+
+      if (!sawFirst) {
+        firstRow = k.getRow();
+        sawFirst = true;
+
+      }
+
+      if (count >= cutoff) {
+        sci.summaries = new HashMap<>();
+        collector.summarize(sci);
+        collector = summarizer.collector(conf);
+        addSummary(k.getRow(), sci.summaries, count);
+        count = 0;
+      }
+
+      lastKey = k;
+    }
+
+    private List<SummaryInfo> merge(int end) {
+      List<SummaryInfo> mergedSummaries = new ArrayList<>();
+      for (int i = 0; i < end; i += 2) {
+        int mergedCount = summaries.get(i).count + summaries.get(i + 1).count;
+        summarizer.combiner(conf).merge(summaries.get(i).summary, summaries.get(i + 1).summary);
+        mergedSummaries.add(new SummaryInfo(summaries.get(i + 1).getLastRow(), summaries.get(i).summary,
mergedCount));
+      }
+      return mergedSummaries;
+    }
+
+    private void addSummary(Text row, Map<String,Long> summary, int count) {
+      Preconditions.checkState(!finished);
+      summaries.add(new SummaryInfo(row, summary, count));
+
+      if (summaries.size() % 2 == 0 && summaries.size() > maxSummaries) {
+        summaries = merge(summaries.size());
+        cutoff *= 2;
+      }
+    }
+
+    boolean collapse() {
+      Preconditions.checkState(finished);
+      if (summaries.size() <= 1) {
+        return false;
+      }
+
+      int end = summaries.size();
+      if (end % 2 == 1) {
+        end--;
+      }
+
+      List<SummaryInfo> mergedSummaries = merge(end);
+
+      if (summaries.size() % 2 == 1) {
+        mergedSummaries.add(summaries.get(summaries.size() - 1));
+      }
+
+      summaries = mergedSummaries;
+
+      return true;
+    }
+
+    void finish() {
+      Preconditions.checkState(!finished);
+      // summarize last data
+      if (count > 0) {
+        sci.summaries = new HashMap<>();
+        collector.summarize(sci);
+        collector = null;
+        addSummary(lastKey.getRow(), sci.summaries, count);
+        count = 0;
+        finished = true;
+      }
+    }
+
+    public void save(DataOutputStream dos, HashMap<String,Integer> symbolTable) throws
IOException {
+      Preconditions.checkState(count == 0);
+
+      dos.writeUTF(name);
+
+      if (firstRow == null) {
+        WritableUtils.writeVInt(dos, 0);
+      } else {
+        firstRow.write(dos);
+      }
+
+      // write summaries
+      WritableUtils.writeVInt(dos, summaries.size());
+      for (SummaryInfo summaryInfo : summaries) {
+        summaryInfo.getLastRow().write(dos);
+        WritableUtils.writeVInt(dos, summaryInfo.count);
+        saveSummary(dos, symbolTable, summaryInfo.summary);
+      }
+    }
+
+    private void saveSummary(DataOutputStream dos, HashMap<String,Integer> symbolTable,
Map<String,Long> summary) throws IOException {
+      WritableUtils.writeVInt(dos, summary.size());
+      for (Entry<String,Long> e : summary.entrySet()) {
+        WritableUtils.writeVInt(dos, symbolTable.get(e.getKey()));
+        WritableUtils.writeVLong(dos, e.getValue());
+      }
+    }
+  }
+
+  public static class Builder {
+    private Summarizer kvs;
+
+    private SummarizerConfiguration conf;
+
+    private List<LgBuilder> locGroups;
+    private LgBuilder lgb;
+
+    private long maxSize;
+
+    public Builder(SummarizerConfiguration conf, Summarizer kvs, long maxSize) {
+      this.conf = conf;
+      this.kvs = kvs;
+      this.locGroups = new ArrayList<>();
+      this.maxSize = maxSize;
+    }
+
+    public void put(Key k, Value v) {
+      lgb.put(k, v);
+    }
+
+    public SummarizerConfiguration getSummarizerConfiguration() {
+      return conf;
+    }
+
+    public void save(DataOutputStream dos) throws IOException {
+
+      if (lgb != null) {
+        lgb.finish();
+        locGroups.add(lgb);
+      }
+
+      byte[] data = _save();
+
+      while (data.length > maxSize) {
+        boolean collapsedSome = false;
+        for (LgBuilder lgBuilder : locGroups) {
+          collapsedSome |= lgBuilder.collapse();
+        }
+
+        if (collapsedSome) {
+          data = _save();
+        } else {
+          break;
+        }
+      }
+
+      if (data.length > maxSize) {
+        dos.writeBoolean(true);
+      } else {
+        dos.writeBoolean(false);
+        // write this out to support efficient skipping
+        WritableUtils.writeVInt(dos, data.length);
+        dos.write(data);
+      }
+    }
+
+    private byte[] _save() throws IOException {
+
+      try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream dos
= new DataOutputStream(baos)) {
+        // create a symbol table
+        HashMap<String,Integer> symbolTable = new HashMap<>();
+        ArrayList<String> symbols = new ArrayList<>();
+        for (LgBuilder lg : locGroups) {
+          for (SummaryInfo si : lg.summaries) {
+            for (String symbol : si.summary.keySet()) {
+              if (!symbolTable.containsKey(symbol)) {
+                symbolTable.put(symbol, symbols.size());
+                symbols.add(symbol);
+              }
+            }
+          }
+        }
+
+        // write symbol table
+        WritableUtils.writeVInt(dos, symbols.size());
+        for (String symbol : symbols) {
+          dos.writeUTF(symbol);
+        }
+
+        WritableUtils.writeVInt(dos, locGroups.size());
+        for (LgBuilder lg : locGroups) {
+          lg.save(dos, symbolTable);
+        }
+
+        dos.close();
+        return baos.toByteArray();
+      }
+    }
+
+    public void startNewLocalityGroup(String name, Set<ByteSequence> columnFamilies)
{
+      if (lgb != null) {
+        lgb.finish();
+        locGroups.add(lgb);
+      }
+
+      lgb = new LgBuilder(conf, kvs, name);
+    }
+
+    public void startDefaultLocalityGroup() {
+      if (lgb != null) {
+        lgb.finish();
+        locGroups.add(lgb);
+      }
+
+      lgb = new LgBuilder(conf, kvs);
+    }
+  }
+
+  public static Builder builder(SummarizerConfiguration conf, SummarizerFactory factory,
long maxSize) {
+    return new Builder(conf, factory.getSummarizer(conf), maxSize);
+  }
+
+  static void skip(DataInputStream in) throws IOException {
+    boolean exceededMaxSize = in.readBoolean();
+    if (!exceededMaxSize) {
+      long len = WritableUtils.readVInt(in);
+      long skipped = in.skip(len);
+      while (skipped < len) {
+        skipped += in.skip(len - skipped);
+      }
+    }
+  }
+
+  static SummarySerializer load(SummarizerConfiguration sconf, DataInputStream in) throws
IOException {
+    boolean exceededMaxSize = in.readBoolean();
+    if (!exceededMaxSize) {
+      WritableUtils.readVInt(in);
+      // load symbol table
+      int numSymbols = WritableUtils.readVInt(in);
+      String[] symbols = new String[numSymbols];
+      for (int i = 0; i < numSymbols; i++) {
+        symbols[i] = in.readUTF();
+      }
+
+      int numLGroups = WritableUtils.readVInt(in);
+      LgSummaries[] allSummaries = new LgSummaries[numLGroups];
+      for (int i = 0; i < numLGroups; i++) {
+        allSummaries[i] = readLGroup(in, symbols);
+      }
+
+      return new SummarySerializer(sconf, allSummaries);
+    } else {
+      return new SummarySerializer(sconf);
+    }
+  }
+
+  private static class LgSummaries {
+
+    private Text firstRow;
+    private SummaryInfo[] summaries;
+    private String lgroupName;
+
+    LgSummaries(Text firstRow, SummaryInfo[] summaries, String lgroupName) {
+      this.firstRow = firstRow;
+      this.summaries = summaries;
+      this.lgroupName = lgroupName;
+    }
+
+    boolean exceedsRange(Text startRow, Text endRow) {
+
+      Text lastRow = summaries[summaries.length - 1].lastRow;
+      if (startRow != null && firstRow.compareTo(startRow) <= 0 && startRow.compareTo(lastRow)
< 0) {
+        return true;
+      }
+
+      if (endRow != null && endRow.compareTo(firstRow) >= 0 && lastRow.compareTo(endRow)
> 0) {
+        return true;
+      }
+
+      return false;
+    }
+
+    void print(String prefix, String indent, PrintStream out) {
+      String p = prefix + indent;
+      out.printf("%sLocality group : %s\n", p, lgroupName);
+      p += indent;
+      for (SummaryInfo si : summaries) {
+        out.printf("%sSummary of %d key values (row of last key '%s') : \n", p, si.count,
si.lastRow);
+        Set<Entry<String,Long>> es = si.summary.entrySet();
+        String p2 = p + indent;
+        for (Entry<String,Long> entry : es) {
+          out.printf("%s%s = %s\n", p2, entry.getKey(), entry.getValue());
+        }
+      }
+    }
+
+    void getSummary(List<RowRange> ranges, Combiner combiner, Map<String,Long>
summary) {
+      boolean[] summariesThatOverlap = new boolean[summaries.length];
+
+      for (RowRange keyExtent : ranges) {
+        Text startRow = keyExtent.getStartRow();
+        Text endRow = keyExtent.getEndRow();
+
+        if (endRow != null && endRow.compareTo(firstRow) < 0) {
+          continue;
+        }
+
+        int start = -1;
+        int end = summaries.length - 1;
+
+        if (startRow == null) {
+          start = 0;
+        } else {
+          for (int i = 0; i < summaries.length; i++) {
+            if (startRow.compareTo(summaries[i].getLastRow()) < 0) {
+              start = i;
+              break;
+            }
+          }
+        }
+
+        if (start == -1) {
+          continue;
+        }
+
+        if (endRow == null) {
+          end = summaries.length - 1;
+        } else {
+          for (int i = start; i < summaries.length; i++) {
+            if (endRow.compareTo(summaries[i].getLastRow()) < 0) {
+              end = i;
+              break;
+            }
+          }
+        }
+
+        for (int i = start; i <= end; i++) {
+          summariesThatOverlap[i] = true;
+        }
+      }
+
+      for (int i = 0; i < summaries.length; i++) {
+        if (summariesThatOverlap[i]) {
+          combiner.merge(summary, summaries[i].summary);
+        }
+      }
+    }
+  }
+
+  private static LgSummaries readLGroup(DataInputStream in, String[] symbols) throws IOException
{
+    String lgroupName = in.readUTF();
+
+    // read first row
+    Text firstRow = new Text();
+    firstRow.readFields(in);
+
+    // read summaries
+    int numSummaries = WritableUtils.readVInt(in);
+    SummaryInfo[] summaries = new SummaryInfo[numSummaries];
+    for (int i = 0; i < numSummaries; i++) {
+      int rowLen = WritableUtils.readVInt(in);
+      byte[] row = new byte[rowLen];
+      in.readFully(row);
+      int count = WritableUtils.readVInt(in);
+      Map<String,Long> summary = readSummary(in, symbols);
+      summaries[i] = new SummaryInfo(row, summary, count);
+    }
+
+    return new LgSummaries(firstRow, summaries, lgroupName);
+  }
+
+  private static Map<String,Long> readSummary(DataInputStream in, String[] symbols)
throws IOException {
+    com.google.common.collect.ImmutableMap.Builder<String,Long> imb = ImmutableMap.builder();
+    int numEntries = WritableUtils.readVInt(in);
+
+    for (int i = 0; i < numEntries; i++) {
+      String symbol = symbols[WritableUtils.readVInt(in)];
+      imb.put(symbol, WritableUtils.readVLong(in));
+    }
+
+    return imb.build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/summary/SummaryWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/summary/SummaryWriter.java b/core/src/main/java/org/apache/accumulo/core/summary/SummaryWriter.java
new file mode 100644
index 0000000..1ebeeae
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/summary/SummaryWriter.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.summary;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileSKVWriter;
+import org.apache.hadoop.io.WritableUtils;
+
+public class SummaryWriter implements FileSKVWriter {
+
+  static final String METASTORE_PREFIX = "accumulo.summaries";
+  static final String METASTORE_INDEX = "accumulo.summaries.index";
+
+  // echo "accumulo summarize" | sha1sum | head -c 8
+  static long MAGIC = 0x15ea283ec03e4c49L;
+  static byte VER = 1;
+
+  private FileSKVWriter writer;
+  private SummarySerializer.Builder[] summaryStores;
+
+  private SummaryWriter(FileSKVWriter writer, SummarizerFactory factory, List<SummarizerConfiguration>
configs, long maxSize) {
+    this.writer = writer;
+    int i = 0;
+    summaryStores = new SummarySerializer.Builder[configs.size()];
+    for (SummarizerConfiguration sconf : configs) {
+      summaryStores[i++] = SummarySerializer.builder(sconf, factory, maxSize);
+    }
+  }
+
+  @Override
+  public boolean supportsLocalityGroups() {
+    return true;
+  }
+
+  @Override
+  public void startNewLocalityGroup(String name, Set<ByteSequence> columnFamilies)
throws IOException {
+    for (SummarySerializer.Builder ssb : summaryStores) {
+      ssb.startNewLocalityGroup(name, columnFamilies);
+    }
+
+    writer.startNewLocalityGroup(name, columnFamilies);
+  }
+
+  @Override
+  public void startDefaultLocalityGroup() throws IOException {
+    for (SummarySerializer.Builder ssb : summaryStores) {
+      ssb.startDefaultLocalityGroup();
+    }
+    writer.startDefaultLocalityGroup();
+  }
+
+  @Override
+  public void append(Key key, Value value) throws IOException {
+    writer.append(key, value);
+    for (SummarySerializer.Builder ssb : summaryStores) {
+      ssb.put(key, value);
+    }
+  }
+
+  @Override
+  public DataOutputStream createMetaStore(String name) throws IOException {
+    return writer.createMetaStore(name);
+  }
+
+  public void writeConfig(SummarizerConfiguration conf, DataOutputStream dos) throws IOException
{
+    // save class (and its config) used to generate summaries
+    dos.writeUTF(conf.getClassName());
+    dos.writeUTF(conf.getPropertyId());
+    WritableUtils.writeVInt(dos, conf.getOptions().size());
+    for (Entry<String,String> entry : conf.getOptions().entrySet()) {
+      dos.writeUTF(entry.getKey());
+      dos.writeUTF(entry.getValue());
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+
+    DataOutputStream out = writer.createMetaStore(METASTORE_INDEX);
+    out.writeLong(MAGIC);
+    out.write(VER);
+    WritableUtils.writeVInt(out, summaryStores.length);
+
+    // Could possibly inline small summaries in the future. Breaking summaries into multiple
block is better for caching a subset of summaries. Also, keeping
+    // the index small is good for the case where summaries that do not exist are requested.
However multiple blocks cause more random I/O in the case when its
+    // not yet in the cache.
+
+    for (int i = 0; i < summaryStores.length; i++) {
+      writeConfig(summaryStores[i].getSummarizerConfiguration(), out);
+      // write if summary is inlined in index... support for possible future optimizations.
+      out.writeBoolean(false);
+      // write pointer to block that will contain summary data
+      WritableUtils.writeVInt(out, i);
+      // write offset of summary data within block. This is not currently used, but it supports
storing multiple summaries in an external block in the
+      // future without changing the code.
+      WritableUtils.writeVInt(out, 0);
+    }
+    out.close();
+
+    for (int i = 0; i < summaryStores.length; i++) {
+      DataOutputStream summaryOut = writer.createMetaStore(METASTORE_PREFIX + "." + i);
+      summaryStores[i].save(summaryOut);
+      summaryOut.close();
+    }
+
+    writer.close();
+  }
+
+  @Override
+  public long getLength() throws IOException {
+    return writer.getLength();
+  }
+
+  public static FileSKVWriter wrap(FileSKVWriter writer, AccumuloConfiguration tableConfig,
boolean useAccumuloStart) {
+    List<SummarizerConfiguration> configs = SummarizerConfigurationUtil.getSummarizerConfigs(tableConfig);
+
+    if (configs.size() == 0) {
+      return writer;
+    }
+
+    SummarizerFactory factory;
+    if (useAccumuloStart) {
+      factory = new SummarizerFactory(tableConfig);
+    } else {
+      factory = new SummarizerFactory();
+    }
+
+    long maxSize = tableConfig.getMemoryInBytes(Property.TABLE_FILE_SUMMARY_MAX_SIZE);
+    return new SummaryWriter(writer, factory, configs, maxSize);
+  }
+}


Mime
View raw message