accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e..@apache.org
Subject [2/2] git commit: ACCUMULO-2202 catch all runtime exceptions, not just NPE
Date Thu, 16 Jan 2014 15:42:06 GMT
ACCUMULO-2202 catch all runtime exceptions, not just NPE


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/47ca312c
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/47ca312c
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/47ca312c

Branch: refs/heads/1.5.1-SNAPSHOT
Commit: 47ca312c950042c4b277a98cbb4eed8a37bf9afd
Parents: d183132 91be551
Author: Eric Newton <eric.newton@gmail.com>
Authored: Thu Jan 16 10:41:31 2014 -0500
Committer: Eric Newton <eric.newton@gmail.com>
Committed: Thu Jan 16 10:41:31 2014 -0500

----------------------------------------------------------------------
 .../java/org/apache/accumulo/core/file/BloomFilterLayer.java  | 7 +++----
 1 file changed, 3 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/47ca312c/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java
index 2636126,0000000..e79da37
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java
@@@ -1,502 -1,0 +1,501 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.accumulo.core.file;
 +
 +import java.io.DataInputStream;
 +import java.io.DataOutputStream;
 +import java.io.IOException;
 +import java.io.PrintStream;
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.HashSet;
 +import java.util.Map;
 +import java.util.Random;
 +import java.util.Set;
 +import java.util.concurrent.BlockingQueue;
 +import java.util.concurrent.ExecutorService;
 +import java.util.concurrent.LinkedBlockingQueue;
 +import java.util.concurrent.ThreadPoolExecutor;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicBoolean;
 +
 +import org.apache.accumulo.core.bloomfilter.DynamicBloomFilter;
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.conf.ConfigurationCopy;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.ByteSequence;
 +import org.apache.accumulo.core.data.PartialKey;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.file.keyfunctor.KeyFunctor;
 +import org.apache.accumulo.core.file.rfile.RFile;
 +import org.apache.accumulo.core.iterators.IteratorEnvironment;
 +import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 +import org.apache.accumulo.core.util.CachedConfiguration;
 +import org.apache.accumulo.core.util.LoggingRunnable;
 +import org.apache.accumulo.core.util.NamingThreadFactory;
 +import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.io.Text;
 +import org.apache.hadoop.util.bloom.Key;
 +import org.apache.hadoop.util.hash.Hash;
 +import org.apache.log4j.Logger;
 +
 +/**
 + * A class that sits on top of different accumulo file formats and provides bloom filter
functionality.
 + * 
 + */
 +public class BloomFilterLayer {
 +  private static final Logger LOG = Logger.getLogger(BloomFilterLayer.class);
 +  public static final String BLOOM_FILE_NAME = "acu_bloom";
 +  public static final int HASH_COUNT = 5;
 +  
 +  private static ExecutorService loadThreadPool = null;
 +  
 +  private static synchronized ExecutorService getLoadThreadPool(int maxLoadThreads) {
 +    if (loadThreadPool != null) {
 +      return loadThreadPool;
 +    }
 +    
 +    if (maxLoadThreads > 0) {
 +      BlockingQueue<Runnable> q = new LinkedBlockingQueue<Runnable>();
 +      loadThreadPool = new ThreadPoolExecutor(0, maxLoadThreads, 60, TimeUnit.SECONDS, q,
new NamingThreadFactory("bloom-loader"));
 +    }
 +    
 +    return loadThreadPool;
 +  }
 +  
 +  public static class Writer implements FileSKVWriter {
 +    private DynamicBloomFilter bloomFilter;
 +    private int numKeys;
 +    private int vectorSize;
 +    
 +    private FileSKVWriter writer;
 +    private KeyFunctor transformer = null;
 +    private boolean closed = false;
 +    
 +    Writer(FileSKVWriter writer, AccumuloConfiguration acuconf) {
 +      this.writer = writer;
 +      initBloomFilter(acuconf);
 +    }
 +    
 +    private synchronized void initBloomFilter(AccumuloConfiguration acuconf) {
 +      
 +      numKeys = acuconf.getCount(Property.TABLE_BLOOM_SIZE);
 +      // vector size should be <code>-kn / (ln(1 - c^(1/k)))</code> bits for
 +      // single key, where <code> is the number of hash functions,
 +      // <code>n</code> is the number of keys and <code>c</code>
is the desired
 +      // max. error rate.
 +      // Our desired error rate is by default 0.005, i.e. 0.5%
 +      double errorRate = acuconf.getFraction(Property.TABLE_BLOOM_ERRORRATE);
 +      vectorSize = (int) Math.ceil(-HASH_COUNT * numKeys / Math.log(1.0 - Math.pow(errorRate,
1.0 / HASH_COUNT)));
 +      bloomFilter = new DynamicBloomFilter(vectorSize, HASH_COUNT, Hash.parseHashType(acuconf.get(Property.TABLE_BLOOM_HASHTYPE)),
numKeys);
 +      
 +      /**
 +       * load KeyFunctor
 +       */
 +      try {
 +        Class<? extends KeyFunctor> clazz = AccumuloVFSClassLoader.loadClass(acuconf.get(Property.TABLE_BLOOM_KEY_FUNCTOR),
KeyFunctor.class);
 +        transformer = clazz.newInstance();
 +        
 +      } catch (Exception e) {
 +        LOG.error("Failed to find KeyFunctor: " + acuconf.get(Property.TABLE_BLOOM_KEY_FUNCTOR),
e);
 +        throw new IllegalArgumentException("Failed to find KeyFunctor: " + acuconf.get(Property.TABLE_BLOOM_KEY_FUNCTOR));
 +        
 +      }
 +      
 +    }
 +    
 +    @Override
 +    public synchronized void append(org.apache.accumulo.core.data.Key key, Value val) throws
IOException {
 +      writer.append(key, val);
 +      Key bloomKey = transformer.transform(key);
 +      if (bloomKey.getBytes().length > 0)
 +        bloomFilter.add(bloomKey);
 +    }
 +    
 +    @Override
 +    public synchronized void close() throws IOException {
 +      
 +      if (closed)
 +        return;
 +      
 +      DataOutputStream out = writer.createMetaStore(BLOOM_FILE_NAME);
 +      out.writeUTF(transformer.getClass().getCanonicalName());
 +      bloomFilter.write(out);
 +      out.flush();
 +      out.close();
 +      writer.close();
 +      closed = true;
 +    }
 +    
 +    @Override
 +    public DataOutputStream createMetaStore(String name) throws IOException {
 +      return writer.createMetaStore(name);
 +    }
 +    
 +    @Override
 +    public void startDefaultLocalityGroup() throws IOException {
 +      writer.startDefaultLocalityGroup();
 +      
 +    }
 +    
 +    @Override
 +    public void startNewLocalityGroup(String name, Set<ByteSequence> columnFamilies)
throws IOException {
 +      writer.startNewLocalityGroup(name, columnFamilies);
 +    }
 +    
 +    @Override
 +    public boolean supportsLocalityGroups() {
 +      return writer.supportsLocalityGroups();
 +    }
 +  }
 +  
 +  static class BloomFilterLoader {
 +    
 +    private volatile DynamicBloomFilter bloomFilter;
 +    private int loadRequest = 0;
 +    private int loadThreshold = 1;
 +    private int maxLoadThreads;
 +    private Runnable loadTask;
 +    private volatile KeyFunctor transformer = null;
 +    private volatile boolean closed = false;
 +    
 +    BloomFilterLoader(final FileSKVIterator reader, AccumuloConfiguration acuconf) {
 +      
 +      maxLoadThreads = acuconf.getCount(Property.TSERV_BLOOM_LOAD_MAXCONCURRENT);
 +      
 +      loadThreshold = acuconf.getCount(Property.TABLE_BLOOM_LOAD_THRESHOLD);
 +      
 +      loadTask = new Runnable() {
 +        public void run() {
 +          
 +          // no need to load the bloom filter if the map file is closed
 +          if (closed)
 +            return;
 +          String ClassName = null;
 +          DataInputStream in = null;
 +          
 +          try {
 +            in = reader.getMetaStore(BLOOM_FILE_NAME);
 +            DynamicBloomFilter tmpBloomFilter = new DynamicBloomFilter();
 +            
 +            // check for closed again after open but before reading the bloom filter in
 +            if (closed)
 +              return;
 +            
 +            /**
 +             * Load classname for keyFunctor
 +             */
 +            ClassName = in.readUTF();
 +            
 +            Class<? extends KeyFunctor> clazz = AccumuloVFSClassLoader.loadClass(ClassName,
KeyFunctor.class);
 +            transformer = clazz.newInstance();
 +            
 +            /**
 +             * read in bloom filter
 +             */
 +            
 +            tmpBloomFilter.readFields(in);
 +            // only set the bloom filter after it is fully constructed
 +            bloomFilter = tmpBloomFilter;
 +          } catch (NoSuchMetaStoreException nsme) {
 +            // file does not have a bloom filter, ignore it
 +          } catch (IOException ioe) {
 +            if (!closed)
 +              LOG.warn("Can't open BloomFilter", ioe);
 +            else
 +              LOG.debug("Can't open BloomFilter, file closed : " + ioe.getMessage());
 +            
 +            bloomFilter = null;
 +          } catch (ClassNotFoundException e) {
 +            LOG.error("Failed to find KeyFunctor in config: " + ClassName, e);
 +            bloomFilter = null;
 +          } catch (InstantiationException e) {
 +            LOG.error("Could not instantiate KeyFunctor: " + ClassName, e);
 +            bloomFilter = null;
 +          } catch (IllegalAccessException e) {
 +            LOG.error("Illegal acess exception", e);
 +            bloomFilter = null;
-           } catch (NullPointerException npe) {
++          } catch (RuntimeException rte) {
 +            if (!closed)
-               throw npe;
++              throw rte;
 +            else
-               LOG.debug("Can't open BloomFilter, NPE after closed ", npe);
-             
++              LOG.debug("Can't open BloomFilter, RTE after closed ", rte);
 +          } finally {
 +            if (in != null) {
 +              try {
 +                in.close();
 +              } catch (IOException e) {
 +                LOG.warn("Failed to close ", e);
 +              }
 +            }
 +          }
 +        }
 +      };
 +      
 +      initiateLoad(maxLoadThreads);
 +      
 +    }
 +    
 +    private synchronized void initiateLoad(int maxLoadThreads) {
 +      // ensure only one thread initiates loading of bloom filter by
 +      // only taking action when loadTask != null
 +      if (loadTask != null && loadRequest >= loadThreshold) {
 +        try {
 +          ExecutorService ltp = getLoadThreadPool(maxLoadThreads);
 +          if (ltp == null) {
 +            // load the bloom filter in the foreground
 +            loadTask.run();
 +          } else {
 +            // load the bloom filter in the background
 +            ltp.execute(new LoggingRunnable(LOG, loadTask));
 +          }
 +        } finally {
 +          // set load task to null so no one else can initiate the load
 +          loadTask = null;
 +        }
 +      }
 +      
 +      loadRequest++;
 +    }
 +    
 +    /**
 +     * Checks if this {@link RFile} contains keys from this range. The membership test is
performed using a Bloom filter, so the result has always non-zero probability of
 +     * false positives.
 +     * 
 +     * @param range
 +     *          range of keys to check
 +     * @return false iff key doesn't exist, true if key probably exists.
 +     * @throws IOException
 +     */
 +    boolean probablyHasKey(Range range) throws IOException {
 +      if (bloomFilter == null) {
 +        initiateLoad(maxLoadThreads);
 +        if (bloomFilter == null)
 +          return true;
 +      }
 +      
 +      Key bloomKey = transformer.transform(range);
 +      
 +      if (bloomKey == null || bloomKey.getBytes().length == 0)
 +        return true;
 +      
 +      return bloomFilter.membershipTest(bloomKey);
 +    }
 +    
 +    public void close() {
 +      this.closed = true;
 +    }
 +  }
 +  
 +  public static class Reader implements FileSKVIterator {
 +    
 +    private BloomFilterLoader bfl;
 +    private FileSKVIterator reader;
 +    
 +    public Reader(FileSKVIterator reader, AccumuloConfiguration acuconf) {
 +      this.reader = reader;
 +      bfl = new BloomFilterLoader(reader, acuconf);
 +    }
 +    
 +    private Reader(FileSKVIterator src, BloomFilterLoader bfl) {
 +      this.reader = src;
 +      this.bfl = bfl;
 +    }
 +    
 +    private boolean checkSuper = true;
 +    
 +    @Override
 +    public boolean hasTop() {
 +      return checkSuper ? reader.hasTop() : false;
 +    }
 +    
 +    @Override
 +    public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean
inclusive) throws IOException {
 +      
 +      if (!bfl.probablyHasKey(range)) {
 +        checkSuper = false;
 +      } else {
 +        reader.seek(range, columnFamilies, inclusive);
 +        checkSuper = true;
 +      }
 +    }
 +    
 +    public synchronized void close() throws IOException {
 +      bfl.close();
 +      reader.close();
 +    }
 +    
 +    @Override
 +    public org.apache.accumulo.core.data.Key getFirstKey() throws IOException {
 +      return reader.getFirstKey();
 +    }
 +    
 +    @Override
 +    public org.apache.accumulo.core.data.Key getLastKey() throws IOException {
 +      return reader.getLastKey();
 +    }
 +    
 +    @Override
 +    public SortedKeyValueIterator<org.apache.accumulo.core.data.Key,Value> deepCopy(IteratorEnvironment
env) {
 +      return new BloomFilterLayer.Reader((FileSKVIterator) reader.deepCopy(env), bfl);
 +    }
 +    
 +    @Override
 +    public org.apache.accumulo.core.data.Key getTopKey() {
 +      return reader.getTopKey();
 +    }
 +    
 +    @Override
 +    public Value getTopValue() {
 +      return reader.getTopValue();
 +    }
 +    
 +    @Override
 +    public void init(SortedKeyValueIterator<org.apache.accumulo.core.data.Key,Value>
source, Map<String,String> options, IteratorEnvironment env)
 +        throws IOException {
 +      throw new UnsupportedOperationException();
 +      
 +    }
 +    
 +    @Override
 +    public void next() throws IOException {
 +      reader.next();
 +    }
 +    
 +    @Override
 +    public DataInputStream getMetaStore(String name) throws IOException {
 +      return reader.getMetaStore(name);
 +    }
 +    
 +    @Override
 +    public void closeDeepCopies() throws IOException {
 +      reader.closeDeepCopies();
 +    }
 +    
 +    @Override
 +    public void setInterruptFlag(AtomicBoolean flag) {
 +      reader.setInterruptFlag(flag);
 +    }
 +    
 +  }
 +  
 +  public static void main(String[] args) throws IOException {
 +    PrintStream out = System.out;
 +    
 +    Random r = new Random();
 +    
 +    HashSet<Integer> valsSet = new HashSet<Integer>();
 +    
 +    for (int i = 0; i < 100000; i++) {
 +      valsSet.add(r.nextInt(Integer.MAX_VALUE));
 +    }
 +    
 +    ArrayList<Integer> vals = new ArrayList<Integer>(valsSet);
 +    Collections.sort(vals);
 +    
 +    ConfigurationCopy acuconf = new ConfigurationCopy(AccumuloConfiguration.getDefaultConfiguration());
 +    acuconf.set(Property.TABLE_BLOOM_ENABLED, "true");
 +    acuconf.set(Property.TABLE_BLOOM_KEY_FUNCTOR, "accumulo.core.file.keyfunctor.ColumnFamilyFunctor");
 +    acuconf.set(Property.TABLE_FILE_TYPE, RFile.EXTENSION);
 +    acuconf.set(Property.TABLE_BLOOM_LOAD_THRESHOLD, "1");
 +    acuconf.set(Property.TSERV_BLOOM_LOAD_MAXCONCURRENT, "1");
 +    
 +    Configuration conf = CachedConfiguration.getInstance();
 +    FileSystem fs = FileSystem.get(conf);
 +    
 +    String suffix = FileOperations.getNewFileExtension(acuconf);
 +    String fname = "/tmp/test." + suffix;
 +    FileSKVWriter bmfw = FileOperations.getInstance().openWriter(fname, fs, conf, acuconf);
 +    
 +    long t1 = System.currentTimeMillis();
 +    
 +    bmfw.startDefaultLocalityGroup();
 +    
 +    for (Integer i : vals) {
 +      String fi = String.format("%010d", i);
 +      bmfw.append(new org.apache.accumulo.core.data.Key(new Text("r" + fi), new Text("cf1")),
new Value(("v" + fi).getBytes()));
 +      bmfw.append(new org.apache.accumulo.core.data.Key(new Text("r" + fi), new Text("cf2")),
new Value(("v" + fi).getBytes()));
 +    }
 +    
 +    long t2 = System.currentTimeMillis();
 +    
 +    out.printf("write rate %6.2f%n", vals.size() / ((t2 - t1) / 1000.0));
 +    
 +    bmfw.close();
 +    
 +    t1 = System.currentTimeMillis();
 +    FileSKVIterator bmfr = FileOperations.getInstance().openReader(fname, false, fs, conf,
acuconf);
 +    t2 = System.currentTimeMillis();
 +    out.println("Opened " + fname + " in " + (t2 - t1));
 +    
 +    t1 = System.currentTimeMillis();
 +    
 +    int hits = 0;
 +    for (int i = 0; i < 5000; i++) {
 +      int row = r.nextInt(Integer.MAX_VALUE);
 +      String fi = String.format("%010d", row);
 +      // bmfr.seek(new Range(new Text("r"+fi)));
 +      org.apache.accumulo.core.data.Key k1 = new org.apache.accumulo.core.data.Key(new Text("r"
+ fi), new Text("cf1"));
 +      bmfr.seek(new Range(k1, true, k1.followingKey(PartialKey.ROW_COLFAM), false), new
ArrayList<ByteSequence>(), false);
 +      if (valsSet.contains(row)) {
 +        hits++;
 +        if (!bmfr.hasTop()) {
 +          out.println("ERROR " + row);
 +        }
 +      }
 +    }
 +    
 +    t2 = System.currentTimeMillis();
 +    
 +    out.printf("random lookup rate : %6.2f%n", 5000 / ((t2 - t1) / 1000.0));
 +    out.println("hits = " + hits);
 +    
 +    int count = 0;
 +    
 +    t1 = System.currentTimeMillis();
 +    
 +    for (Integer row : valsSet) {
 +      String fi = String.format("%010d", row);
 +      // bmfr.seek(new Range(new Text("r"+fi)));
 +      
 +      org.apache.accumulo.core.data.Key k1 = new org.apache.accumulo.core.data.Key(new Text("r"
+ fi), new Text("cf1"));
 +      bmfr.seek(new Range(k1, true, k1.followingKey(PartialKey.ROW_COLFAM), false), new
ArrayList<ByteSequence>(), false);
 +      
 +      if (!bmfr.hasTop()) {
 +        out.println("ERROR 2 " + row);
 +      }
 +      
 +      count++;
 +      
 +      if (count >= 500) {
 +        break;
 +      }
 +    }
 +    
 +    t2 = System.currentTimeMillis();
 +    
 +    out.printf("existant lookup rate %6.2f%n", 500 / ((t2 - t1) / 1000.0));
 +    out.println("expected hits 500.  Receive hits: " + count);
 +    bmfr.close();
 +  }
 +}


Mime
View raw message