accumulo-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ctubbsii <...@git.apache.org>
Subject [GitHub] accumulo pull request #224: ACCUMULO-4500 ACCUMULO-96 Added summarization
Date Tue, 07 Mar 2017 01:08:16 GMT
Github user ctubbsii commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/224#discussion_r104566494
  
    --- Diff: core/src/main/java/org/apache/accumulo/core/summary/SummarySerializer.java ---
    @@ -0,0 +1,543 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.accumulo.core.summary;
    +
    +import java.io.ByteArrayOutputStream;
    +import java.io.DataInputStream;
    +import java.io.DataOutputStream;
    +import java.io.IOException;
    +import java.io.PrintStream;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Set;
    +
    +import org.apache.accumulo.core.client.summary.Summarizer;
    +import org.apache.accumulo.core.client.summary.Summarizer.Collector;
    +import org.apache.accumulo.core.client.summary.Summarizer.Combiner;
    +import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
    +import org.apache.accumulo.core.data.ByteSequence;
    +import org.apache.accumulo.core.data.Key;
    +import org.apache.accumulo.core.data.Value;
    +import org.apache.accumulo.core.summary.Gatherer.RowRange;
    +import org.apache.hadoop.io.Text;
    +import org.apache.hadoop.io.WritableUtils;
    +
    +import com.google.common.base.Preconditions;
    +import com.google.common.collect.ImmutableMap;
    +
    +/**
    + * This class supports serializing summaries and periodically storing summaries. The
implementations attempts to generate around 10 summaries that are evenly
    + * spaced. This allows asking for summaries for sub-ranges of data in a rfile.
    + *
    + * <p>
    + * At first summaries are created for every 1000 keys values. After 10 summaries are
added, the 10 summaries are merged to 5 and summaries are then created for
    + * every 2000 key values. The code keeps merging summaries and doubling the amount of
key values per summary. This results in each summary covering about the
    + * same number of key values.
    + *
    + */
    +
    +class SummarySerializer {
    +
    +  private SummarizerConfiguration sconf;
    +  private LgSummaries[] allSummaries;
    +
    +  private SummarySerializer(SummarizerConfiguration sconf, LgSummaries[] allSummaries)
{
    +    this.sconf = sconf;
    +    this.allSummaries = allSummaries;
    +  }
    +
    +  private SummarySerializer(SummarizerConfiguration sconf) {
    +    this.sconf = sconf;
    +    // this indicates max size was exceeded
    +    this.allSummaries = null;
    +  }
    +
    +  public SummarizerConfiguration getSummarizerConfiguration() {
    +    return sconf;
    +  }
    +
    +  public void print(String prefix, String indent, PrintStream out) {
    +
    +    if (allSummaries == null) {
    +      out.printf("%sSummary not stored because it was too large\n", prefix + indent);
    +    } else {
    +      for (LgSummaries lgs : allSummaries) {
    +        lgs.print(prefix, indent, out);
    +      }
    +    }
    +  }
    +
    +  public Map<String,Long> getSummary(List<RowRange> ranges, SummarizerFactory
sf) {
    +
    +    Summarizer kvs = sf.getSummarizer(sconf);
    +
    +    Map<String,Long> summary = new HashMap<>();
    +    for (LgSummaries lgs : allSummaries) {
    +      lgs.getSummary(ranges, kvs.combiner(sconf), summary);
    +    }
    +    return summary;
    +  }
    +
    +  public boolean exceedsRange(List<RowRange> ranges) {
    +    boolean er = false;
    +    for (LgSummaries lgs : allSummaries) {
    +      for (RowRange ke : ranges) {
    +        er |= lgs.exceedsRange(ke.getStartRow(), ke.getEndRow());
    +        if (er) {
    +          return er;
    +        }
    +      }
    +    }
    +
    +    return er;
    +  }
    +
    +  public boolean exceededMaxSize() {
    +    return allSummaries == null;
    +  }
    +
    +  private static class SummaryStoreImpl implements org.apache.accumulo.core.client.summary.Summarizer.StatisticConsumer
{
    +
    +    HashMap<String,Long> summaries;
    +
    +    @Override
    +    public void accept(String summary, long value) {
    +      summaries.put(summary, value);
    +    }
    +  }
    +
    +  private static class LgBuilder {
    +    private Summarizer summarizer;
    +    private SummarizerConfiguration conf;
    +    private Collector collector;
    +
    +    private int maxSummaries = 10;
    +
    +    private int cutoff = 1000;
    +    private int count = 0;
    +
    +    private List<SummaryInfo> summaries = new ArrayList<>();
    +
    +    private Key lastKey;
    +
    +    private SummaryStoreImpl sci = new SummaryStoreImpl();
    +
    +    private String name;
    +
    +    private boolean sawFirst = false;
    +    private Text firstRow;
    +
    +    private boolean finished = false;
    +
    +    public LgBuilder(SummarizerConfiguration conf, Summarizer kvs) {
    +      this.conf = conf;
    +      this.summarizer = kvs;
    +      this.name = "<DEFAULT>";
    +      this.collector = kvs.collector(conf);
    +    }
    +
    +    public LgBuilder(SummarizerConfiguration conf, Summarizer kvs, String name) {
    +      this.conf = conf;
    +      this.summarizer = kvs;
    +      this.name = name;
    +      this.collector = kvs.collector(conf);
    +    }
    +
    +    public void put(Key k, Value v) {
    +      collector.accept(k, v);
    +      count++;
    +
    +      if (!sawFirst) {
    +        firstRow = k.getRow();
    +        sawFirst = true;
    +
    +      }
    +
    +      if (count >= cutoff) {
    +        sci.summaries = new HashMap<>();
    +        collector.summarize(sci);
    +        collector = summarizer.collector(conf);
    +        addSummary(k.getRow(), sci.summaries, count);
    +        count = 0;
    +      }
    +
    +      lastKey = k;
    +    }
    +
    +    private List<SummaryInfo> merge(int end) {
    +      List<SummaryInfo> mergedSummaries = new ArrayList<>();
    +      for (int i = 0; i < end; i += 2) {
    +        int mergedCount = summaries.get(i).count + summaries.get(i + 1).count;
    +        summarizer.combiner(conf).merge(summaries.get(i).summary, summaries.get(i + 1).summary);
    +        mergedSummaries.add(new SummaryInfo(summaries.get(i + 1).getLastRow(), summaries.get(i).summary,
mergedCount));
    +      }
    +      return mergedSummaries;
    +    }
    +
    +    private void addSummary(Text row, Map<String,Long> summary, int count) {
    +      Preconditions.checkState(!finished);
    +      summaries.add(new SummaryInfo(row, summary, count));
    +
    +      if (summaries.size() % 2 == 0 && summaries.size() > maxSummaries) {
    +        summaries = merge(summaries.size());
    +        cutoff *= 2;
    +      }
    +    }
    +
    +    boolean collapse() {
    +      Preconditions.checkState(finished);
    +      if (summaries.size() <= 1) {
    +        return false;
    +      }
    +
    +      int end = summaries.size();
    +      if (end % 2 == 1) {
    +        end--;
    +      }
    +
    +      List<SummaryInfo> mergedSummaries = merge(end);
    +
    +      if (summaries.size() % 2 == 1) {
    +        mergedSummaries.add(summaries.get(summaries.size() - 1));
    +      }
    +
    +      summaries = mergedSummaries;
    +
    +      return true;
    +    }
    +
    +    void finish() {
    +      Preconditions.checkState(!finished);
    +      // summarize last data
    +      if (count > 0) {
    +        sci.summaries = new HashMap<>();
    +        collector.summarize(sci);
    +        collector = null;
    +        addSummary(lastKey.getRow(), sci.summaries, count);
    +        count = 0;
    +        finished = true;
    +      }
    +    }
    +
    +    public void save(DataOutputStream dos, HashMap<String,Integer> symbolTable)
throws IOException {
    +      Preconditions.checkState(count == 0);
    +
    +      dos.writeUTF(name);
    +
    +      if (firstRow == null) {
    +        WritableUtils.writeVInt(dos, 0);
    +      } else {
    +        firstRow.write(dos);
    +      }
    +
    +      // write summaries
    +      WritableUtils.writeVInt(dos, summaries.size());
    +      for (SummaryInfo summaryInfo : summaries) {
    +        summaryInfo.getLastRow().write(dos);
    +        WritableUtils.writeVInt(dos, summaryInfo.count);
    +        saveSummary(dos, symbolTable, summaryInfo.summary);
    +      }
    +    }
    +
    +    private void saveSummary(DataOutputStream dos, HashMap<String,Integer> symbolTable,
Map<String,Long> summary) throws IOException {
    +      WritableUtils.writeVInt(dos, summary.size());
    +      for (Entry<String,Long> e : summary.entrySet()) {
    +        WritableUtils.writeVInt(dos, symbolTable.get(e.getKey()));
    +        WritableUtils.writeVLong(dos, e.getValue());
    +      }
    +    }
    +  }
    +
    +  public static class Builder {
    +    private Summarizer kvs;
    +
    +    private SummarizerConfiguration conf;
    +
    +    private List<LgBuilder> locGroups;
    +    private LgBuilder lgb;
    +
    +    private long maxSize;
    +
    +    public Builder(SummarizerConfiguration conf, Summarizer kvs, long maxSize) {
    +      this.conf = conf;
    +      this.kvs = kvs;
    +      this.locGroups = new ArrayList<>();
    +      this.maxSize = maxSize;
    +    }
    +
    +    public void put(Key k, Value v) {
    +      lgb.put(k, v);
    +    }
    +
    +    public SummarizerConfiguration getSummarizerConfiguration() {
    +      return conf;
    +    }
    +
    +    public void save(DataOutputStream dos) throws IOException {
    +
    +      if (lgb != null) {
    +        lgb.finish();
    +        locGroups.add(lgb);
    +      }
    +
    +      byte[] data = _save();
    +
    +      while (data.length > maxSize) {
    +        boolean collapsedSome = false;
    +        for (LgBuilder lgBuilder : locGroups) {
    +          collapsedSome |= lgBuilder.collapse();
    +        }
    +
    +        if (collapsedSome) {
    +          data = _save();
    +        } else {
    +          break;
    +        }
    +      }
    +
    +      if (data.length > maxSize) {
    +        dos.writeBoolean(true);
    +      } else {
    +        dos.writeBoolean(false);
    +        // write this out to support efficient skipping
    +        WritableUtils.writeVInt(dos, data.length);
    +        dos.write(data);
    +      }
    +    }
    +
    +    private byte[] _save() throws IOException {
    +
    +      ByteArrayOutputStream baos = new ByteArrayOutputStream();
    +      DataOutputStream dos = new DataOutputStream(baos);
    +
    +      // create a symbol table
    +      HashMap<String,Integer> symbolTable = new HashMap<>();
    +      ArrayList<String> symbols = new ArrayList<>();
    +      for (LgBuilder lg : locGroups) {
    +        for (SummaryInfo si : lg.summaries) {
    +          for (String symbol : si.summary.keySet()) {
    +            if (!symbolTable.containsKey(symbol)) {
    +              symbolTable.put(symbol, symbols.size());
    +              symbols.add(symbol);
    +            }
    +          }
    +        }
    +      }
    +
    +      // write symbol table
    +      WritableUtils.writeVInt(dos, symbols.size());
    +      for (String symbol : symbols) {
    +        dos.writeUTF(symbol);
    +      }
    +
    +      WritableUtils.writeVInt(dos, locGroups.size());
    +      for (LgBuilder lg : locGroups) {
    +        lg.save(dos, symbolTable);
    +      }
    +
    +      dos.close();
    --- End diff --
    
    try-with-resources even better.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message