accumulo-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From joshelser <...@git.apache.org>
Subject [GitHub] accumulo pull request #224: ACCUMULO-4500 ACCUMULO-96 Added summarization
Date Tue, 07 Mar 2017 00:50:11 GMT
Github user joshelser commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/224#discussion_r104519924
  
    --- Diff: core/src/main/java/org/apache/accumulo/core/summary/SummarySerializer.java ---
    @@ -0,0 +1,543 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.accumulo.core.summary;
    +
    +import java.io.ByteArrayOutputStream;
    +import java.io.DataInputStream;
    +import java.io.DataOutputStream;
    +import java.io.IOException;
    +import java.io.PrintStream;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Set;
    +
    +import org.apache.accumulo.core.client.summary.Summarizer;
    +import org.apache.accumulo.core.client.summary.Summarizer.Collector;
    +import org.apache.accumulo.core.client.summary.Summarizer.Combiner;
    +import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
    +import org.apache.accumulo.core.data.ByteSequence;
    +import org.apache.accumulo.core.data.Key;
    +import org.apache.accumulo.core.data.Value;
    +import org.apache.accumulo.core.summary.Gatherer.RowRange;
    +import org.apache.hadoop.io.Text;
    +import org.apache.hadoop.io.WritableUtils;
    +
    +import com.google.common.base.Preconditions;
    +import com.google.common.collect.ImmutableMap;
    +
    +/**
    + * This class supports serializing summaries and periodically storing summaries. The
implementations attempts to generate around 10 summaries that are evenly
    + * spaced. This allows asking for summaries for sub-ranges of data in a rfile.
    + *
    + * <p>
    + * At first summaries are created for every 1000 keys values. After 10 summaries are
added, the 10 summaries are merged to 5 and summaries are then created for
    + * every 2000 key values. The code keeps merging summaries and doubling the amount of
key values per summary. This results in each summary covering about the
    + * same number of key values.
    + *
    + */
    +
    +class SummarySerializer {
    +
    +  private SummarizerConfiguration sconf;
    +  private LgSummaries[] allSummaries;
    +
    +  private SummarySerializer(SummarizerConfiguration sconf, LgSummaries[] allSummaries)
{
    +    this.sconf = sconf;
    +    this.allSummaries = allSummaries;
    +  }
    +
    +  private SummarySerializer(SummarizerConfiguration sconf) {
    +    this.sconf = sconf;
    +    // this indicates max size was exceeded
    +    this.allSummaries = null;
    +  }
    +
    +  public SummarizerConfiguration getSummarizerConfiguration() {
    +    return sconf;
    +  }
    +
    +  public void print(String prefix, String indent, PrintStream out) {
    +
    +    if (allSummaries == null) {
    +      out.printf("%sSummary not stored because it was too large\n", prefix + indent);
    +    } else {
    +      for (LgSummaries lgs : allSummaries) {
    +        lgs.print(prefix, indent, out);
    +      }
    +    }
    +  }
    +
    +  public Map<String,Long> getSummary(List<RowRange> ranges, SummarizerFactory
sf) {
    +
    +    Summarizer kvs = sf.getSummarizer(sconf);
    +
    +    Map<String,Long> summary = new HashMap<>();
    +    for (LgSummaries lgs : allSummaries) {
    +      lgs.getSummary(ranges, kvs.combiner(sconf), summary);
    +    }
    +    return summary;
    +  }
    +
    +  public boolean exceedsRange(List<RowRange> ranges) {
    +    boolean er = false;
    +    for (LgSummaries lgs : allSummaries) {
    +      for (RowRange ke : ranges) {
    +        er |= lgs.exceedsRange(ke.getStartRow(), ke.getEndRow());
    +        if (er) {
    +          return er;
    +        }
    +      }
    +    }
    +
    +    return er;
    +  }
    +
    +  public boolean exceededMaxSize() {
    +    return allSummaries == null;
    +  }
    +
    +  private static class SummaryStoreImpl implements org.apache.accumulo.core.client.summary.Summarizer.StatisticConsumer
{
    +
    +    HashMap<String,Long> summaries;
    +
    +    @Override
    +    public void accept(String summary, long value) {
    +      summaries.put(summary, value);
    +    }
    +  }
    +
    +  private static class LgBuilder {
    +    private Summarizer summarizer;
    +    private SummarizerConfiguration conf;
    +    private Collector collector;
    +
    +    private int maxSummaries = 10;
    +
    +    private int cutoff = 1000;
    +    private int count = 0;
    +
    +    private List<SummaryInfo> summaries = new ArrayList<>();
    +
    +    private Key lastKey;
    +
    +    private SummaryStoreImpl sci = new SummaryStoreImpl();
    +
    +    private String name;
    +
    +    private boolean sawFirst = false;
    +    private Text firstRow;
    +
    +    private boolean finished = false;
    +
    +    public LgBuilder(SummarizerConfiguration conf, Summarizer kvs) {
    +      this.conf = conf;
    +      this.summarizer = kvs;
    +      this.name = "<DEFAULT>";
    +      this.collector = kvs.collector(conf);
    +    }
    +
    +    public LgBuilder(SummarizerConfiguration conf, Summarizer kvs, String name) {
    +      this.conf = conf;
    +      this.summarizer = kvs;
    +      this.name = name;
    +      this.collector = kvs.collector(conf);
    +    }
    +
    +    public void put(Key k, Value v) {
    +      collector.accept(k, v);
    +      count++;
    +
    +      if (!sawFirst) {
    +        firstRow = k.getRow();
    +        sawFirst = true;
    +
    +      }
    +
    +      if (count >= cutoff) {
    +        sci.summaries = new HashMap<>();
    +        collector.summarize(sci);
    +        collector = summarizer.collector(conf);
    +        addSummary(k.getRow(), sci.summaries, count);
    +        count = 0;
    +      }
    +
    +      lastKey = k;
    +    }
    +
    +    private List<SummaryInfo> merge(int end) {
    +      List<SummaryInfo> mergedSummaries = new ArrayList<>();
    +      for (int i = 0; i < end; i += 2) {
    +        int mergedCount = summaries.get(i).count + summaries.get(i + 1).count;
    +        summarizer.combiner(conf).merge(summaries.get(i).summary, summaries.get(i + 1).summary);
    +        mergedSummaries.add(new SummaryInfo(summaries.get(i + 1).getLastRow(), summaries.get(i).summary,
mergedCount));
    +      }
    +      return mergedSummaries;
    +    }
    +
    +    private void addSummary(Text row, Map<String,Long> summary, int count) {
    +      Preconditions.checkState(!finished);
    +      summaries.add(new SummaryInfo(row, summary, count));
    +
    +      if (summaries.size() % 2 == 0 && summaries.size() > maxSummaries) {
    +        summaries = merge(summaries.size());
    +        cutoff *= 2;
    +      }
    +    }
    +
    +    boolean collapse() {
    +      Preconditions.checkState(finished);
    +      if (summaries.size() <= 1) {
    +        return false;
    +      }
    +
    +      int end = summaries.size();
    +      if (end % 2 == 1) {
    --- End diff --
    
    Is there a lower-bound on when you want to do this? (aside from the sanity check of <=
1 from above)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message